Dueling Deep Q Network(Dueling DQN)是对DQN算法的改进,有效提升了算法的性能。如果对DQN算法还不太了解的话,可以参考我的这篇博文:深度强化学习-DQN算法原理与代码,里面详细讲述了DQN算法的原理和代码实现。本文就带领大家了解一下Dueling DQN算法,论文链接见下方。

论文:http://proceedings.mlr.press/v48/wangf16.pdf

代码:https://github.com/indigoLovee/Dueling_DQN

喜欢的话可以点个star呢。

1 Dueling DQN算法简介

Dueling DQN算法提出了一种新的神经网络结构——对偶网络(duel network)。网络的输入与DQN和DDQN算法的输入一样,均为状态信息,但是输出却有所不同。Dueling DQN算法的输出包括两个分支,分别是该状态的状态价值V(标量)和每个动作的优势值A(与动作空间同维度的向量)。DQN和DDQN算法的输出只有一个分支,为该状态下每个动作的动作价值(与动作空间同维度的向量)。具体的网络结构如下图所示:

单分支网络结构(DQN和DDQN)

 对偶网络结构(Dueling DQN)

在DQN算法的网络结构中,输入为一张或多张照片,利用卷积网络提取图像特征,之后经过全连接层输出每个动作的动作价值;在Dueling DQN算法的网络结构中,输入同样为一张或多张照片,然后利用卷积网络提取图像特征获取特征向量,输出时会经过两个全连接层分支,分别对应状态价值和优势值,最后将状态价值和优势值相加即可得到每个动作的动作价值(即绿色连线操作)。

为什么要采用对偶网络结构?

其实动机很简单:很多游戏的Q值,只受当前状态影响,无论采取什么动作区别不大。如下图所示:

 这是Atari game中的一个赛车游戏,Value表示状态价值,Advantage表示动作优势值,图中黄色部分表示注意力。当前方没有车辆时,智能体左右移动并没有影响,说明动作对Q值没有影响,但是状态对Q值很有影响。从上面两幅图可以看出,Value更加关注远方道路,因为开的越远,对应的状态值越大;Advantage没有特别注意的地方,说明动作没有影响。当前方存在车辆阻挡时,智能体的动作选择至关重要,说明动作对Q值存在影响,同样状态对Q值也会存在影响。从下面两幅图可以看出,Value同样更加关注远方道路,但是Advantge此时会关注前方车辆,因为如果不采取相应动作,智能体很有可能会发生碰撞,分数就会很低。对偶网络思想符合很多场景的设定。

2 Dueling DQN算法原理

2.1 优势函数(Advantage function)

在讲优势函数之前,我们先来回顾一些基本概念吧。

折扣回报(Discounted return):

U_{t}=R_{t}+\gamma R_{t+1}+\gamma ^{2}R_{t+1}+\gamma ^{3}R_{t+3}+\cdots

U_{t} 表示从t时刻开始,未来所有奖励的加权求和。在t时刻,U_{t}是未知的,它依赖于未来所有状态和动作。

动作价值函数(Action-value function):

Q_{\pi }(s_{t},a_{t})=E\left [ U_{t}\mid S_{t}=s_{t},A_{t}=a_{t} \right ]

 Q_{\pi }(s_{t},a_{t})是回报U_{t}的条件期望,将t+1时刻以后的状态和动作全部消掉。Q_{\pi }(s_{t},a_{t})依赖于状态s_{t}、动作a_{t}以及策略\pi

状态价值函数(State-value function):

V_{\pi }(s_{t})=E\left [ Q_{\pi }(s_{t},A) \right ]

V_{\pi }(s_{t})Q_{\pi }(s_{t},a_{t})的期望,将Q_{\pi }(s_{t},a_{t})中的动作a_{t}消掉。V_{\pi }(s_{t})依赖于状态s_{t}  和策略\pi

最优动作价值函数(Optimal action-value function):

Q^{\ast }(s,a)=max_{\pi }Q_{\pi }(s,a)

 动作价值函数Q_{\pi }(s_{t},a_{t})依赖于策略\pi,对Q_{\pi }(s_{t},a_{t})关于\pi求最大值,消除掉\pi,得到Q^{\ast }(s,a)Q^{\ast }(s,a)只依赖于状态s和动作a,不依赖于策略\piQ^{\ast }(s,a)用于评价在状态s下做动作a的好坏。

 最优状态价值函数(Optimal state_value function):

V^{\ast }(s)=max_{\pi }V_{\pi }(s)

V_{\pi }(s)关于\pi求 最大值,消除掉\pi,得到V^{\ast }(s)V^{\ast }(s)只依赖于状态s, 不依赖于策略\piV^{\ast }(s)用于评价状态s的好坏。

 最优优势函数(Optimal advantage function):

A^{\ast }(s,a)=Q^{\ast }(s,a)-V^{\ast }(s)

 将V^{\ast }(s)作为baseline,则优势函数A^{\ast }(s,a)表示动作a相对于baseline的优势。动作a越好,其优势越大。

 2.2 优势函数性质

 为了推导优势函数的性质,这里给出定理1(大家可以从确定性策略角度来理解定理1,这里不给出该定理的证明):

定理1:V^{\ast }(s)=max_{a}Q^{\ast }(s,a)

针对优势函数

A^{\ast }(s,a)=Q^{\ast }(s,a)-V^{\ast }(s)

 两边同时对动作a取最大值,得到

max_{a}A^{\ast }(s,a)=max_{a}Q^{\ast }(s,a)-V^{\ast }(s)

 根据定理1进一步得到

max_{a}A^{\ast }(s,a)=0

 接着对优势函数做变换,得到

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)

不妨在上式右边减去 max_{a}A^{\ast }(s,a),由于其等于0,等号不变,得到

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)-max_{a}A^{\ast }(s,a)

看到这里小伙伴可能会疑惑,为什么要 减去 max_{a}A^{\ast }(s,a),这不是多此一举吗?不急,这个后面2.4节会解释。至此,我们得到了定理2,这是一个非常重要的等式,Dueling Network就是根据定理2得到的。

定理2:Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)-max_{a}A^{\ast }(s,a)

 2.3 对偶网络

在DQN算法中,我们利用神经网络Q(s,a;w)来近似最优动作值函数Q^{\ast }(s,a),从而得到最优策略,其中w为网络参数。但是在Dueling Network中,我们利用神经网路A(s,a;w^{A})来近似最优优势函数A^{\ast }(s,a),利用神经网络V(s;w^{V})来近似最优状态值函数V^{\ast }(s),其中w^{A}w^{V}均为网络参数。

根据定理2,将其中的V^{\ast }(s)用神经网络A(s,a;w^{A})来替代,A^{\ast }(s,a)用神经网络A(s,a;w^{A})来代替,得到

Q(s,a;w^{A},w^{V})=V(s;w^{V})+A(s,a;w^{A})-max_{a}A(s,a;w^{A})

其中Q(s,a;w^{A},w^{V})近似Q^{\ast }(s,a),它被成为Dueling Network。令w=(w^{A},w^{V}),则

Q(s,a;w)=V(s;w^{V})+a(s,a;w^{A})-max_{a}A(s,a;w^{A})

现在,我们发现Dueling Network与DQN完全一致,包括输入和输出,唯一不同在于神经网络的结构。Dueling Network的网络结构更好,因此它的表现比DQN更好。另外,DQN中的一些技巧也同样可以用于Dueling Network中,例如优先经验回放、Double DQN和Multi-step TD target等。

2.4 唯一性

 观察下面两个等式:

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)-max_{a}A^{\ast }(s,a)

下面的式子相较于上面的式子只是减去了max_{a}A^{\ast }(s,a),前面已经讨论过 max_{a}A^{\ast }(s,a)=0,既然如此,这一项有必要的吗,直接用上面的式子不就行了吗?

答案是有必要的。因为上面的式子存在一个缺点:无法通过学习Q^{\ast }(s,a)来唯一确定V^{\ast }(s)A^{\ast }(s,a)举个栗子:令{V}'=V^{\ast }+10{A}'=A^{\ast }-10,那么

Q^{\ast }(s,a)=V^{\ast }(s)+A^{\ast }(s,a)={V}'(s)+{A}'(s,a)

因此结果不唯一。这种不唯一性会带来什么问题呢?

如果神经网络V和A上下波动,幅度相同,方向相反,那么网络的输出相同,但是由于两个网络都在上下波动,两个网络都不稳定,因此都训练不好。

针对上面式子的不唯一性,下面的式子可以解决。通过加入最大化强可以有效避免不唯一性问题,仍然用上面的栗子来分析,令{V}'=V^{\ast }+10{A}'=A^{\ast }-10,那么

Q^{\ast }(s,a)=(V^{\ast }(s)+10)+(A^{\ast }(s,a)-10)-max_{a}(A^{\ast }(s,a)-10) =V^{\ast }(s)+A^{\ast }(s,a)+10

 此时Q^{\ast }(s,a)发生了变化,解决了不唯一性问题,这就是要加上最大化项的原因了。

2.5 两种数学形式

 通过前面的推导,我们得到了Dueling Network的数学形式为

Q(s,a;w)=V(s;w^{V})+a(s,a;w^{A})-max_{a}A(s,a;w^{A})

 实际中将最大化形式变成均值形式效果更好,更稳定,其数学形式如下

Q(s,a;w)=V(s;w^{V})+A(s,a;w^{A})-mean_{a}A(s,a;w^{A})

 这种替换没有任何理论依据,就是实验效果好,简单!粗暴!有效!

3 Dueling DQN算法代码

经验回放采用集中式均匀回放,代码如下(脚本buffer.py):

import numpy as np


class ReplayBuffer:
    def __init__(self, state_dim, action_dim, max_size, batch_size):
        self.mem_size = max_size
        self.mem_cnt = 0
        self.batch_size = batch_size

        self.state_memory = np.zeros((self.mem_size, state_dim))
        self.action_memory = np.zeros((self.mem_size, ))
        self.reward_memory = np.zeros((self.mem_size, ))
        self.next_state_memory = np.zeros((self.mem_size, state_dim))
        self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)

    def store_transition(self, state, action, reward, state_, done):
        mem_idx = self.mem_cnt % self.mem_size

        self.state_memory[mem_idx] = state
        self.action_memory[mem_idx] = action
        self.reward_memory[mem_idx] = reward
        self.next_state_memory[mem_idx] = state_
        self.terminal_memory[mem_idx] = done

        self.mem_cnt += 1

    def sample_buffer(self):
        mem_len = min(self.mem_size, self.mem_cnt)

        batch = np.random.choice(mem_len, self.batch_size, replace=False)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.next_state_memory[batch]
        terminals = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminals

    def ready(self):
        return self.mem_cnt > self.batch_size

目标网络的更新方式为软更新,Dueling DQN的实现代码如下(脚本Dueling_DQN.py):

import torch as T
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from buffer import ReplayBuffer

device = T.device("cuda:0" if T.cuda.is_available() else "cpu")


class DuelingDeepQNetwork(nn.Module):
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
        super(DuelingDeepQNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim, fc1_dim)
        self.fc2 = nn.Linear(fc1_dim, fc2_dim)
        self.V = nn.Linear(fc2_dim, 1)
        self.A = nn.Linear(fc2_dim, action_dim)

        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        self.to(device)

    def forward(self, state):
        x = T.relu(self.fc1(state))
        x = T.relu(self.fc2(x))

        V = self.V(x)
        A = self.A(x)

        return V, A

    def save_checkpoint(self, checkpoint_file):
        T.save(self.state_dict(), checkpoint_file, _use_new_zipfile_serialization=False)

    def load_checkpoint(self, checkpoint_file):
        self.load_state_dict(T.load(checkpoint_file))


class DuelingDQN:
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim, ckpt_dir,
                 gamma=0.99, tau=0.005, epsilon=1.0, eps_end=0.01, eps_dec=5e-4,
                 max_size=1000000, batch_size=256):
        self.gamma = gamma
        self.tau = tau
        self.batch_size = batch_size
        self.epsilon = epsilon
        self.eps_min = eps_end
        self.eps_dec = eps_dec
        self.checkpoint_dir = ckpt_dir
        self.action_space = [i for i in range(action_dim)]

        self.q_eval = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                          fc1_dim=fc1_dim, fc2_dim=fc2_dim)
        self.q_target = DuelingDeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                            fc1_dim=fc1_dim, fc2_dim=fc2_dim)

        self.memory = ReplayBuffer(state_dim=state_dim, action_dim=action_dim,
                                   max_size=max_size, batch_size=batch_size)

        self.update_network_parameters(tau=1.0)

    def update_network_parameters(self, tau=None):
        if tau is None:
            tau = self.tau

        for q_target_params, q_eval_params in zip(self.q_target.parameters(), self.q_eval.parameters()):
            q_target_params.data.copy_(tau * q_eval_params + (1 - tau) * q_target_params)

    def remember(self, state, action, reward, stata_, done):
        self.memory.store_transition(state, action, reward, stata_, done)

    def decrement_epsilon(self):
        self.epsilon = self.epsilon - self.eps_dec \
            if self.epsilon > self.eps_min else self.eps_min

    def choose_action(self, observation, isTrain=True):
        state = T.tensor([observation], dtype=T.float).to(device)
        _, A = self.q_eval.forward(state)
        action = T.argmax(A).item()

        if (np.random.random() < self.epsilon) and isTrain:
            action = np.random.choice(self.action_space)

        return action

    def learn(self):
        if not self.memory.ready():
            return

        states, actions, rewards, next_states, terminals = self.memory.sample_buffer()
        batch_idx = T.arange(self.batch_size, dtype=T.long).to(device)
        states_tensor = T.tensor(states, dtype=T.float).to(device)
        actions_tensor = T.tensor(actions, dtype=T.long).to(device)
        rewards_tensor = T.tensor(rewards, dtype=T.float).to(device)
        next_states_tensor = T.tensor(next_states, dtype=T.float).to(device)
        terminals_tensor = T.tensor(terminals).to(device)

        with T.no_grad():
            V_, A_ = self.q_target.forward(next_states_tensor)
            q_ = V_ + A_ - T.mean(A_, dim=-1, keepdim=True)
            q_[terminals_tensor] = 0.0
            target = rewards_tensor + self.gamma * T.max(q_, dim=-1)[0]
        V, A = self.q_eval.forward(states_tensor)
        q = (V + A - T.mean(A, dim=-1, keepdim=True))[batch_idx, actions_tensor]

        loss = F.mse_loss(q, target.detach())
        self.q_eval.optimizer.zero_grad()
        loss.backward()
        self.q_eval.optimizer.step()

        self.update_network_parameters()
        self.decrement_epsilon()

    def save_models(self, episode):
        self.q_eval.save_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_{}.pth'.format(episode))
        print('Saving Q_eval network successfully!')
        self.q_target.save_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_{}.pth'.format(episode))
        print('Saving Q_target network successfully!')

    def load_models(self, episode):
        self.q_eval.load_checkpoint(self.checkpoint_dir + 'Q_eval/DuelingDQN_q_eval_{}.pth'.format(episode))
        print('Loading Q_eval network successfully!')
        self.q_target.load_checkpoint(self.checkpoint_dir + 'Q_target/DuelingDQN_Q_target_{}.pth'.format(episode))
        print('Loading Q_target network successfully!')

算法仿真环境为gym库中的LunarLander-v2,因此需要先配置好gym库。进入Anaconda3中对应的Python环境中,执行下面的指令

pip install gym

但是,这样安装的gym库只包括少量的内置环境,如算法环境、简单文字游戏和经典控制环境,无法使用LunarLander-v2。因此还需要安装一些其他依赖项,具体可以参考我的这篇博文:AttributeError: module ‘gym.envs.box2d‘ has no attribute ‘LunarLander‘ 解决办法

让智能体在环境中训练500轮,训练代码如下(脚本train.py):

import gym
import numpy as np
import argparse
from utils import plot_learning_curve, create_directory
from Dueling_DQN import DuelingDQN

parser = argparse.ArgumentParser()
parser.add_argument('--max_episodes', type=int, default=500)
parser.add_argument('--ckpt_dir', type=str, default='./checkpoints/DuelingDQN/')
parser.add_argument('--reward_path', type=str, default='./output_images/reward.png')
parser.add_argument('--epsilon_path', type=str, default='./output_images/epsilon.png')

args = parser.parse_args()


def main():
    env = gym.make('LunarLander-v2')
    agent = DuelingDQN(alpha=0.0003, state_dim=env.observation_space.shape[0], action_dim=env.action_space.n,
                       fc1_dim=256, fc2_dim=256, ckpt_dir=args.ckpt_dir, gamma=0.99, tau=0.005, epsilon=1.0,
                       eps_end=0.05, eps_dec=5e-4, max_size=1000000, batch_size=256)
    create_directory(args.ckpt_dir, sub_dirs=['Q_eval', 'Q_target'])
    total_rewards, avg_rewards, eps_history = [], [], []

    for episode in range(args.max_episodes):
        total_reward = 0
        done = False
        observation = env.reset()
        while not done:
            action = agent.choose_action(observation, isTrain=True)
            observation_, reward, done, info = env.step(action)
            agent.remember(observation, action, reward, observation_, done)
            agent.learn()
            total_reward += reward
            observation = observation_

        total_rewards.append(total_reward)
        avg_reward = np.mean(total_rewards[-100:])
        avg_rewards.append(avg_reward)
        eps_history.append(agent.epsilon)
        print('EP:{} reward:{} avg_reward:{} epsilon:{}'.
              format(episode + 1, total_reward, avg_reward, agent.epsilon))

        if (episode + 1) % 50 == 0:
            agent.save_models(episode + 1)

    episodes = [i for i in range(args.max_episodes)]
    plot_learning_curve(episodes, avg_rewards, 'Reward', 'reward', args.reward_path)
    plot_learning_curve(episodes, eps_history, 'Epsilon', 'epsilon', args.epsilon_path)


if __name__ == '__main__':
    main()

训练时还会用到画图函数和创建文件夹函数,它们均放置在utils.py脚本中,具体代码如下:

import os
import matplotlib.pyplot as plt


def plot_learning_curve(episodes, records, title, ylabel, figure_file):
    plt.figure()
    plt.plot(episodes, records, linestyle='-', color='r')
    plt.title(title)
    plt.xlabel('episode')
    plt.ylabel(ylabel)

    plt.show()
    plt.savefig(figure_file)


def create_directory(path: str, sub_dirs: list):
    for sub_dir in sub_dirs:
        if os.path.exists(path + sub_dir):
            print(path + sub_dir + ' is already exist!')
        else:
            os.makedirs(path + sub_dir, exist_ok=True)
            print(path + sub_dir + ' create successfully!')

仿真结果如下图所示:

 平均累积奖励曲线

epsilon变化曲线

 通过平均累积奖励可以看出,Dueling DQN算法大约在250步时趋于收敛。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐