深度强化学习-Double DQN算法原理与代码
引言
Double Deep Q Network(DDQN)是对DQN算法的改进,有效提升了算法的性能,本文就带领大家了解一下这个算法,Double Q-learning算法的论文和代码链接见下方。
代码:https://github.com/indigoLovee/DDQN
喜欢的话请点个star~。
1 DDQN算法简介
Q-learning算法采用来更新动作价值,这样会导致“最大化偏差”(maximization bias),使得估计的动作价值偏大。
我们来看一个最大化偏差的例子。上图为一个回合制任务,Markov决策过程的状态空间为,回合开始时总是处在状态,可以选择的动作空间。如果选择动作,则可以到达状态,该步奖励为+0;如果选择动作,则可以达到终止状态并获得奖励+1。从状态出发,有很多可选的动作(例如有1000个可选的动作),但是这些动作都指向终止状态,并且奖励都服从均值为0、方差为100的正态分布。从理论上说,这个例子的最优价值函数为:,,最优策略应当是。但是,如果采用Q-learning算法,在中间过程中会走一些弯路:在学习过程中,从出发的某些动作会采样到比较大的奖励值,从而导致会比较大,使得在时更倾向于选择。这样的错误需要大量的数据才能纠正。(由于公式中的下标无法用中文表示,所以都用英文来代替,其中start表示开始,middle表示中间,end表示终止,goMiddle表示去中间,goEnd表示去终止)
为了解决这个问题,双重Q学习(Double Q-learning)算法使用两个独立的动作价值估计值和,用或来代替Q-learning中的。由于和是相互独立的估计,所以,其中,这样就消除了偏差。在双重学习的过程中,和都需要逐渐更新。
对于DQN算法也有同样的结论,将Double Q-learning应用于DQN,得到了双重深度Q网络(Double Deep Q Network, Double DQN)。
2 DDQN算法原理
DDQN算法相较于DQN算法,其主要针对后者的过估计问题,改变了目标值的计算方法,其他地方与DQN算法完全一致。因此,如果已经知道了DQN算法的所有理论,那么DDQN算法就非常简单啦,这篇博文的理论部分主要讲解DDQN算法的改进部分。如果有小伙伴对DQN算法不太了解,可以参考我的这篇blog:深度强化学习-DQN算法原理与代码,里面详细介绍了DQN算法的相关理论并进行了仿真验证。
由于Double Q-learning要求构建两个动作价值函数,一个用于估计动作,另外一个用于估计该动作的价值。但是考虑到DQN算法中已经有了评估网络和目标网络两个网络,所以DDQN算法在估计回报时只需要用评估网络确定动作,用目标网络确定动作价值即可,不要另外构建新的网络。因此,只需要将DQN算法中计算目标值的方法:
更换为:
其中表示目标值,表示折扣系数,表示评估网络参数,表示目标网络参数。
这样就得到了带经验回放的DDQN算法,下图是原论文中的DDQN算法与DQN算法的对比结果。
其中蓝色曲线为DDQN算法,红色曲线为DQN算法,通过Value estimates曲线可以明显看出DDQN算法的值估计较DQN算法低,有效解决了过估计问题;另外通过Score累计奖励曲线可以看出,DDQN算法的累计奖励更高,拥有更好的决策能力。
3 DDQN算法伪代码
4 仿真验证
经验回放采用集中式均匀回放,代码如下(脚本buffer.py):
import numpy as np
class ReplayBuffer:
def __init__(self, state_dim, action_dim, max_size, batch_size):
self.mem_size = max_size
self.batch_size = batch_size
self.mem_cnt = 0
self.state_memory = np.zeros((self.mem_size, state_dim))
self.action_memory = np.zeros((self.mem_size, ))
self.reward_memory = np.zeros((self.mem_size, ))
self.next_state_memory = np.zeros((self.mem_size, state_dim))
self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)
def store_transition(self, state, action, reward, state_, done):
mem_idx = self.mem_cnt % self.mem_size
self.state_memory[mem_idx] = state
self.action_memory[mem_idx] = action
self.reward_memory[mem_idx] = reward
self.next_state_memory[mem_idx] = state_
self.terminal_memory[mem_idx] = done
self.mem_cnt += 1
def sample_buffer(self):
mem_len = min(self.mem_size, self.mem_cnt)
batch = np.random.choice(mem_len, self.batch_size, replace=False)
states = self.state_memory[batch]
actions = self.action_memory[batch]
rewards = self.reward_memory[batch]
states_ = self.next_state_memory[batch]
terminals = self.terminal_memory[batch]
return states, actions, rewards, states_, terminals
def ready(self):
return self.mem_cnt > self.batch_size
目标网络采用软更新方式更新网络权值,DDQN算法的实现代码如下(脚本DDQN.py):
import torch as T
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from buffer import ReplayBuffer
device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
class DeepQNetwork(nn.Module):
def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
super(DeepQNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, fc1_dim)
self.fc2 = nn.Linear(fc1_dim, fc2_dim)
self.q = nn.Linear(fc2_dim, action_dim)
self.optimizer = optim.Adam(self.parameters(), lr=alpha)
self.to(device)
def forward(self, state):
x = T.relu(self.fc1(state))
x = T.relu(self.fc2(x))
q = self.q(x)
return q
def save_checkpoint(self, checkpoint_file):
T.save(self.state_dict(), checkpoint_file, _use_new_zipfile_serialization=False)
def load_checkpoint(self, checkpoint_file):
self.load_state_dict(T.load(checkpoint_file))
class DDQN:
def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim, ckpt_dir,
gamma=0.99, tau=0.005, epsilon=1.0, eps_end=0.01, eps_dec=5e-7,
max_size=1000000, batch_size=256):
self.gamma = gamma
self.tau = tau
self.epsilon = epsilon
self.eps_min = eps_end
self.eps_dec = eps_dec
self.batch_size = batch_size
self.checkpoint_dir = ckpt_dir
self.action_space = [i for i in range(action_dim)]
self.q_eval = DeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
fc1_dim=fc1_dim, fc2_dim=fc2_dim)
self.q_target = DeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
fc1_dim=fc1_dim, fc2_dim=fc2_dim)
self.memory = ReplayBuffer(state_dim=state_dim, action_dim=action_dim,
max_size=max_size, batch_size=batch_size)
self.update_network_parameters(tau=1.0)
def update_network_parameters(self, tau=None):
if tau is None:
tau = self.tau
for q_target_params, q_eval_params in zip(self.q_target.parameters(), self.q_eval.parameters()):
q_target_params.data.copy_(tau * q_eval_params + (1 - tau) * q_target_params)
def remember(self, state, action, reward, state_, done):
self.memory.store_transition(state, action, reward, state_, done)
def choose_action(self, observation, isTrain=True):
state = T.tensor([observation], dtype=T.float).to(device)
actions = self.q_eval.forward(state)
action = T.argmax(actions).item()
if (np.random.random() < self.epsilon) and isTrain:
action = np.random.choice(self.action_space)
return action
def decrement_epsilon(self):
self.epsilon = self.epsilon - self.eps_dec \
if self.epsilon > self.eps_min else self.eps_min
def learn(self):
if not self.memory.ready():
return
states, actions, rewards, next_states, terminals = self.memory.sample_buffer()
batch_idx = np.arange(self.batch_size)
states_tensor = T.tensor(states, dtype=T.float).to(device)
rewards_tensor = T.tensor(rewards, dtype=T.float).to(device)
next_states_tensor = T.tensor(next_states, dtype=T.float).to(device)
terminals_tensor = T.tensor(terminals).to(device)
with T.no_grad():
q_ = self.q_eval.forward(next_states_tensor)
next_actions = T.argmax(q_, dim=-1)
q_ = self.q_target.forward(next_states_tensor)
q_[terminals_tensor] = 0.0
target = rewards_tensor + self.gamma * q_[batch_idx, next_actions]
q = self.q_eval.forward(states_tensor)[batch_idx, actions]
loss = F.mse_loss(q, target.detach())
self.q_eval.optimizer.zero_grad()
loss.backward()
self.q_eval.optimizer.step()
self.update_network_parameters()
self.decrement_epsilon()
def save_models(self, episode):
self.q_eval.save_checkpoint(self.checkpoint_dir + 'Q_eval/DDQN_q_eval_{}.pth'.format(episode))
print('Saving Q_eval network successfully!')
self.q_target.save_checkpoint(self.checkpoint_dir + 'Q_target/DDQN_Q_target_{}.pth'.format(episode))
print('Saving Q_target network successfully!')
def load_models(self, episode):
self.q_eval.load_checkpoint(self.checkpoint_dir + 'Q_eval/DDQN_q_eval_{}.pth'.format(episode))
print('Loading Q_eval network successfully!')
self.q_target.load_checkpoint(self.checkpoint_dir + 'Q_target/DDQN_Q_target_{}.pth'.format(episode))
print('Loading Q_target network successfully!')
算法仿真环境为gym库中的LunarLander-v2,因此需要先配置好gym库。进入Anaconda3中对应的Python环境中,执行下面的指令
pip install gym
但是,这样安装的gym库只包括少量的内置环境,如算法环境、简单文字游戏和经典控制环境,无法使用LunarLander-v2。因此还需要安装一些其他依赖项,具体可以参考我的这篇blog:AttributeError: module ‘gym.envs.box2d‘ has no attribute ‘LunarLander‘ 解决办法
让智能体在环境中训练500轮,训练代码如下(脚本train.py):
import gym
import numpy as np
import argparse
from DDQN import DDQN
from utils import plot_learning_curve, create_directory
parser = argparse.ArgumentParser()
parser.add_argument('--max_episodes', type=int, default=500)
parser.add_argument('--ckpt_dir', type=str, default='./checkpoints/DDQN/')
parser.add_argument('--reward_path', type=str, default='./output_images/avg_reward.png')
parser.add_argument('--epsilon_path', type=str, default='./output_images/epsilon.png')
args = parser.parse_args()
def main():
env = gym.make('LunarLander-v2')
agent = DDQN(alpha=0.0003, state_dim=env.observation_space.shape[0], action_dim=env.action_space.n,
fc1_dim=256, fc2_dim=256, ckpt_dir=args.ckpt_dir, gamma=0.99, tau=0.005, epsilon=1.0,
eps_end=0.05, eps_dec=5e-4, max_size=1000000, batch_size=256)
create_directory(args.ckpt_dir, sub_dirs=['Q_eval', 'Q_target'])
total_rewards, avg_rewards, eps_history = [], [], []
for episode in range(args.max_episodes):
total_reward = 0
done = False
observation = env.reset()
while not done:
action = agent.choose_action(observation, isTrain=True)
observation_, reward, done, info = env.step(action)
agent.remember(observation, action, reward, observation_, done)
agent.learn()
total_reward += reward
observation = observation_
total_rewards.append(total_reward)
avg_reward = np.mean(total_rewards[-100:])
avg_rewards.append(avg_reward)
eps_history.append(agent.epsilon)
print('EP:{} reward:{} avg_reward:{} epsilon:{}'.
format(episode + 1, total_reward, avg_reward, agent.epsilon))
if (episode + 1) % 50 == 0:
agent.save_models(episode + 1)
episodes = [i for i in range(args.max_episodes)]
plot_learning_curve(episodes, avg_rewards, 'Reward', 'reward', args.reward_path)
plot_learning_curve(episodes, eps_history, 'Epsilon', 'epsilon', args.epsilon_path)
if __name__ == '__main__':
main()
训练时还会用到画图函数和创建文件夹函数,它们均放置在utils.py脚本中,具体代码如下:
import os
import matplotlib.pyplot as plt
def plot_learning_curve(episodes, records, title, ylabel, figure_file):
plt.figure()
plt.plot(episodes, records, linestyle='-', color='r')
plt.title(title)
plt.xlabel('episode')
plt.ylabel(ylabel)
plt.show()
plt.savefig(figure_file)
def create_directory(path: str, sub_dirs: list):
for sub_dir in sub_dirs:
if os.path.exists(path + sub_dir):
print(path + sub_dir + ' is already exist!')
else:
os.makedirs(path + sub_dir, exist_ok=True)
print(path + sub_dir + ' create successfully!')
仿真结果如下图所示:
通过平均奖励曲线可以看出,大概迭代到300步左右时DDQN算法趋于收敛。下图为DQN算法的平均奖励曲线:
通过对比可以明显看出DDQN算法较DQN算法,具有更快的收敛速度,算法的性能得到有效提升。
今天的文章深度强化学习-Double DQN算法原理与代码分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/27083.html