强化学习(三) - Gym库介绍和使用,Markov决策程序实例,动态规划决策实例

1. 引言

在这个部分补充之前马尔科夫决策和动态规划部分的代码。在以后的内容我会把相关代码都附到相关内容的后面。本部分代码和将来的代码会参考《深度强化学习原理与python实现》与Udacity的课程《Reinforcement Learning》。

2. Gym库

Gym库(http://gym.openai.com/)是OpenAI推出的强化学习实验环境库,它用python语言实现了离散时间智能体/环境接口中的环境部分。除了依赖少量的商业库外,整个项目时开源免费的。
在这里插入图片描述

Gym库内置上百种实验环境,包括以下几类。

  • 算法环境:包括一些字符串处理等传统计算机方法的实验环境。
  • 简单文本环境:包括几个用文本表示的简单游戏。
  • 经典控制环境:包括一些简单几何体运动,常用于经典强化学习算法的研究。
  • Atari游戏环境:包括数十个Atari 2600游戏,具有像素化的图形界面,希望玩家尽可能夺得高分。
  • MuJoCo环境:利用收费的MuJoCo运动引擎进行连续性的控制任务。
  • 机械控制环境:关于机械臂的抓取和控制等。

2.1 Gym库安装

Python版本要求为 3.5+(在以后的内容中我将使用python 3.6)。然后就可以是使用pip安装Gym库

首先升级pip,

pip install --upgrade pip

以下命令为最小安装Gym环境,

pip install gym
利用源代码构建

如果愿意,还可以直接克隆gym Git存储库。若想要修改Gym本身或添加环境时,这是特别有用的。下载和安装使用:

git clone https://github.com/openai/gym
cd gym
pip install -e .

之后可以运行pip install -e .[all]来执行包含所有环境的完整安装。这需要安装多个涉及的依赖项,包括cmake和一个最近的pip版本。

2.2 Gym库使用

2.2.1 环境(enviroment)

这里有一个运行程序的最简单的例子。这将为1000个时间步骤运行CartPole-v0环境的一个实例,并在每个步骤中呈现环境。你应该看到一个窗口弹出渲染经典的车杆问题:

import gym
env = gym.make('CartPole-v0')
env.reset()
for _ in range(1000):
    env.render()
    env.step(env.action_space.sample()) # 采取随机行动
env.close()

输出如下:
在这里插入图片描述
通常,我们将结束模拟之前,车杆被允许离开屏幕。现在,忽略关于调用step()的警告,即使这个环境已经返回done = True

如果希望运行其他一些环境,可以尝试用类似MountainCar-v0MsPacman-v0(需要Atari依赖项)或Hopper-v1(需要MuJoCo依赖项)来替换上面的cartpole-v0。环境都来自于Env基类。

若丢失了任何依赖项,应该会得到错误消息来提示丢失了什么。安装缺少的依赖项通常非常简单。对于Hopper-v1需要一个的MuJoCo许可证

代码解释

导入Gym库:

import gym

在导入Gym库后,可以通过make()函数来得到环境对象。每一个环境都有一个ID,它是形如“Xxxxx-v0”的字符串,如“CartPloe-v0”等。环境名称的最后的部分表示版本号,不同版本的环境可能有不同的行为。使用取出环境“CartPloe-v0”的代码为:

env = gym.make('CartPole-v0')

想要查看当前的Gym库已经注册了哪些环境,可以使用以下代码:

from gym import envs
env_specs = envs.registry.all()
env_ids = [env_spec.id for env_spec in env_specs]
env_ids

每个环境都定义了自己的观测空间和动作空间。环境env的观测空间用env.observation_space表示,动作空间用env.action_space表示。观测空间和动作空间既可以是离散空间(即取值是有限个离散的值),也可以使连续的空间(即取值是连续的)。在Gym库中,离散空间一般用gym.spaces.Discrete类表示,连续空间用gym.spaces.Box类表示。例如,环境‘MountainCar-v0’的动作空间是Box(2,)表示观测可以用2个float值表示;环境‘MountainCar-v0’的动作空间是Dicrete(3),表示动作取自{0,1,2}。对于离散空间gym.spaces.Discrete类实例成员n表示有几个可能的取值;对于连续空间,Box类实例的成员low和high表示每个浮点数的取值范围。

接下来使用环境对象env。首先初始化环境对象env,

env.reset()

该调用能返回智能体的初始观测,是np.array对象。

环境初始化之后就可以使用了。使用环境的核心是使用环境对象的step()方法,具体内容会在下面介绍。

env.step()的参数需要取自动作空间。可以使用以下语句从动作空间中随机选取一个动作。

action = env.action_space.sample()

每次调用env.step()只会让环境前进一步。所以,env.step()往往放在循环结构里,通过循环调用来完成整个回合。

在env.reset()或env.step()后,可以使用以下语句以图形化的方法显示当前的环境。

env.render()

使用完环境后,可以使用下列语句关闭环境。

env.close()

如果绘制和实验的图形界面窗口,那么关闭该窗口的最佳方式是调用env.close()。试图直接关闭图形界面可能会导致内存不能释放,甚至会导致死机。

测试智能体在Gym库中某个任务的性能时,学术界一般最关心100个回合的平均回合奖励。对于有些环境,还会制定一个参考的回合奖励值,当连续100个回合的奖励大于指定的值时,就认为这个任务被解决了。但是,并不是所有的任务都指定了这样的值。对于没有指定值的任务,就无所谓任务被解决了或者没有被解决。

2.2.2 观测(observation)

如果我们想得到一个更好的结果,那我们就不能在每一步都采取随机的行动。我们需要知道我们的行动对环境做了什么可能会更好。

环境的step函数返回的正是我们需要的东西。事实上,step函数会返回四个值。这四个值是

  • observation (object): 一个特定环境的对象,代表智能体对环境的观察。例如,来自摄像头的像素数据,机器人的关节角度和关节速度,或者棋盘游戏中的棋盘状态,和env.reset()返回值的意义相同。。
  • reward(float):前一个动作所获得的奖励量。不同环境下的比例不同,但目标始终是增加你的总奖励。
  • done (boolean):是否要再次重置环境。大多数(但不是全部)任务都被划分为定义明确的事件,done为True表示该事件已经终止。(例如,可能是杆子翻得太厉害,或者失去了最后的生命。)
  • info(dict):对调试有用的诊断信息。它有时会对学习有用(例如,它可能包含环境最后一次状态变化背后的原始概率)。然而,你的智能体的评估是不允许使用它来学习的。

这只是经典的 "智能体-环境循环 "的一个实现。每一个时间步,智能体选择一个动作(action),环境返回一个观察(observation)和一个奖励(reward)
在这里插入图片描述

import gym
env = gym.make('CartPole-v0')
for i_episode in range(20):
    observation = env.reset()
    for t in range(100):
        env.render()
        print(observation)
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        if done:
            print("Episode finished after {} timesteps".format(t+1))
            break
env.close()

输出如下:
在这里插入图片描述

[-0.061586   -0.75893141  0.05793238  1.15547541]
[-0.07676463 -0.95475889  0.08104189  1.46574644]
[-0.0958598  -1.15077434  0.11035682  1.78260485]
[-0.11887529 -0.95705275  0.14600892  1.5261692 ]
[-0.13801635 -0.7639636   0.1765323   1.28239155]
[-0.15329562 -0.57147373  0.20218013  1.04977545]
Episode finished after 14 timesteps
[-0.02786724  0.00361763 -0.03938967 -0.01611184]
[-0.02779488 -0.19091794 -0.03971191  0.26388759]
[-0.03161324  0.00474768 -0.03443415 -0.04105167]
2.2.3 空间(spaces)

在上面的示例中,我们从环境的操作空间中随机取样操作。但这些行为到底是什么呢?每个环境都带有一个action_space和一个observation_space。这些属性类型为Space,它们描述了有效动作和观察的格式:

import gym
env = gym.make('CartPole-v0')
print(env.action_space)
#> Discrete(2)
print(env.observation_space)
#> Box(4,)

离散(Discrete)空间允许一个固定的非负数范围,因此在本例中有效的动作(action)要么是0要么是1。Box空间表示一个n维的盒子,因此有效的观察结果将是一个4个数字的数组。我们也可以检查Box的边界:

print(env.observation_space.high)
#> array([ 2.4       ,         inf,  0.20943951,         inf])
print(env.observation_space.low)
#> array([-2.4       ,        -inf, -0.20943951,        -inf])

这种内省有助于编写适用于许多不同环境的泛型代码。Box离散空间是最常见的空间(space)。我们可以从一个空间取样或检查某物是否属于它:

from gym import spaces
space = spaces.Discrete(8) # Set with 8 elements {0, 1, 2, ..., 7}
x = space.sample()
assert space.contains(x)
assert space.n == 8
2.2.4 程序实例

这里我们通过一个完整的例子来学习如何与Gym库中的环境交互,选用经典控制任务,小车上山(MountainCar-v0)。这里主要介绍交互的代码,而不详细说明这个控制任务及其求解。

首先我们看一下这个任务的观测空间的动作空间

import gym
env = gym.make('MountainCar-v0')
print('观测空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('观测范围 = {} ~ {}'.format(env.observation_space.low, env.observation_space.high))
print('动作数 = {}'.format(env.action_space.n))

输出如下,

观测空间 = Box(2,)
动作空间 = Discrete(3)
观测范围 = [-1.2  -0.07] ~ [0.6  0.07]
动作数 = 3

运行结果告诉我们,观测空间是形状为(2,)的浮点型np.array,而动作空间是取{0, 1, 2}的int型数值。

接下来考虑智能体。智能体往往是我们自己实现的。下面代码给出了一个智能体类——BespokeAgent类。智能体的decide()方法实现了决策功能,而learn()方法实现了学习功能。BespokeAgent类是一个比较简单的类,它只能根据给定的数学表达式进行决策,并且不能有效学习。所以它并不是一个真正意义上的强化学习智能体类。但是,用于演示智能体和环境的交互已经足够了。

class BespokeAgent:
    def __init__(self, env):
        pass

    def decide(self, observation):  # 决策
        position, velocity = observation
        lb = min(-0.09 * (position + 0.25) ** 2 + 0.03, 0.3 * (position + 0.9) ** 4 - 0.008)
        ub = -0.07 * (position + 0.38) ** 2 + 0.06
        if lb < velocity < ub:
            action = 2
        else:
            action = 0
        return action  # 返回动作

    def learn(self, *args):  # 学习
        pass

agent = BespokeAgent(env)

接下来我们试图让智能体与环境交互。play_once()函数可以让智能体和环境交互一个回合。这个函数有4个参数。

  • 参数env是环境类。
  • 参数agent是智能体类。
  • 参数render是bool类型变量,指示在运行过程中是否要图形化显示。如果函数参数render为True,那么在交互过程中会调用env.render()以显示图形化界面,而这个界面可以通过调用env.close()关闭。
  • 参数train是bool类型的变量,指示在运行过程中是否训练智能体。在训练过程中应当设置为True,以调用agent.learn()函数;在测试过程中应当设置为False,使得智能体不变。

这个函数有一个返回值episode_reward,是float类型的数值,表示智能体与环境交互一个回合的回合总奖励。

def play_montecarlo(env, agent, render=False, train=False):
    episode_reward = 0.  # 记录回合总奖励,初始化为0
    observation = env.reset()  # 重置游戏环境,开始新回合
    while True:  # 不断循环,直到回合结束
        if render:  # 判断是否显示
            env.render()  # 显示图形界面,图形界面可以用env.close()语句关闭
        action = agent.decide(observation)
        next_observation, reward, done, _ = env.step(action)  # 执行动作
        episode_reward += reward  # 收集回合奖励
        if train:  # 判断是否训练智能体
            agent.learn(observation, action, reward, done)  # 学习
        if done:  # 回合结束,跳出循环
           break
        observation = next_observation
    return episode_reward  # 返回回合总奖励

我们可以用下列代码让智能体和环境交互一个回合,并在交互过程中图形化显示。交互完毕后,可用env.close()语句关闭图形化界面。

env.seed(0)  # 设置随机数种子,只是为了让结果可以精确复现,一般情况下可删去
episode_reward = play_montecarlo(env, agent, render=True)
print('回合奖励 ={}'.format(episode_reward))
env.close()  # 此语句可关闭图形界面

为了系统评估智能体的性能,下列代码求出了连续交互100回合的平均回合奖励。小车上山环境有一个参考的回合奖励值-110,如果当连续100个回合的平均回合奖励大于-110,则认为这个任务被解决了。BespokeAgent类对应的策略的平均回合奖励大概就在-110左右。

episode_rewards = [play_montecarlo(env, agent) for _ in range(100)]
print('平均回合奖励 ={}'.format(np.mean(episode_rewards)))

3.Markov 决策过程

3.1 实例:悬崖寻路

本节考虑Gym库中悬崖寻路问题(CliffWalking-v0)。悬崖寻路问题是这样一种回合制问题:在一个 4 × 12 4 \times12 4×12的网格中,智能体最开始在左下角的网格,希望移动到右下角的网格。智能体每次可以在上下左右这四个方向中移动一步,每移动一步会惩罚一个单位的奖励。但是,移动有以下的限制。

  • 智能体不能移出网格。如果智能体想执行某个动作移出网格,那么就让本步不移动。但是这个操作仍然会惩罚一个单位的奖励。
  • 如果智能体将要到达最下一排网格(几开始网格和目标网格之间的10个网格),智能体会立刻回到开始网格,并惩罚100个单位的奖励。这10个网格可被视为“悬崖”。

当智能体移动到终点时,回合结束,回合奖励为各步奖励相加。

在这里插入图片描述

3.1.1 实验环境使用

以下代码演示了如何倒入这个环境并查看这个环境的基本信息。

import gym
env = gym.make('CliffWalking-v0')
print('观察空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('状态数量 = {}, 动作数量= {}'.format(env.nS, env.nA))
print('地图大小 = {}'.format(env.shape))

输出为,

观察空间 = Discrete(48)
动作空间 = Discrete(4)
状态数量 = 48, = 4
地图大小 = (4, 12)

这个环境是一个离散的Markov决策过程。在这个Markov决策过程中,每个状态是取自 S = { 0 , 1 , . . . , 46 } \mathcal{S}= \{0,1,...,46\} S={0,1,...,46}的int型数值(加上终止状态则为 S + = 0 , 1 , . . . , 46 , 47 \mathcal{S}^+={0,1,...,46,47} S+=0,1,...,46,47):0表示向上,1表示向右,2表示向下,3表示向左。奖励取自 { − 1 , − 100 } \{-1,-100\} {1,100},遇到悬崖为-100,否则为-1。

下面代码给出了用给出的策略运行一个回合的代码。函数play_once()有两个参数, 一个是环境对象,另外一个是策略policy,它是np.array类型的实例。

import numpy as np
def play_once(env, policy):
    total_reward = 0
    state = env.reset()
    loc = np.unravel_index(state, env.shape)
    print('状态 = {} , 位置={} '.format(state, loc))
    while True:
        action = np.random.choice(env.nA, p=policy[state])
        # step函数,详情见2.2.2 观测
        next_state, reward, done, _ = env.step(action)
        print('状态 = {}, 位置 ={}, 奖励 ={}'.format(state, loc, reward))
        total_reward += reward
        if done:
            break
        state = next_state
    return total_reward

以下代码给出了一个最优策略。optimal_policy最优策略是在开始处向上,接着一 路向右,然后到最右边时向下。

actions = np.ones(env.shape, dtype=int)
actions[-1, :] = 0
actions[:, -1] = 2
optimal_policy = np.eye(4)[actions.reshape(-1)]

下面的代码用最优策略运行一个回合。采用最优策略,从开始网格到目标网格一共要 移动13步,回合总奖励为-13。

total_reward = play_once(env, optimal_policy) 
print('总奖励 ={}'.format(total_reward))

输出为,

状态 = 36, 位置 =(3, 0), 奖励 =-1
状态 = 24, 位置 =(2, 0), 奖励 =-1
状态 = 25, 位置 =(2, 1), 奖励 =-1
状态 = 26, 位置 =(2, 2), 奖励 =-1
状态 = 27, 位置 =(2, 3), 奖励 =-1
状态 = 28, 位置 =(2, 4), 奖励 =-1
状态 = 29, 位置 =(2, 5), 奖励 =-1
状态 = 30, 位置 =(2, 6), 奖励 =-1
状态 = 31, 位置 =(2, 7), 奖励 =-1
状态 = 32, 位置 =(2, 8), 奖励 =-1
状态 = 33, 位置 =(2, 9), 奖励 =-1
状态 = 34, 位置 =(2, 10), 奖励 =-1
状态 = 35, 位置 =(2, 11), 奖励 =-1
总奖励 =-13 

3.1.2 求解Bellman期望方程

接下来考虑策略评估。我们用Bellman期望方程求解给定策略的状态价值和动作价值。 首先来看状态价值。用状态价值表示状态价值的Bellmn期望方程为:
v π ( s ) = ∑ a π ( a ∣ s ) [ ∑ s ′ , r p ( s ′ , r ∣ s , a ) [ r + γ v π ( s ′ ) ] ] , s ∈ S v_\pi(s) = \sum_a \pi(a|s)[\sum_{s',r}p(s',r|s,a)[r+\gamma v_\pi(s')]],s \in \mathcal{S} vπ(s)=aπ(as)[s,rp(s,rs,a)[r+γvπ(s)]],sS
这是一个线性方程组,其标准形式为
v π ( s ) − γ ∑ a ∑ s ′ π ( a ∣ s ) p ( s ′ ∣ s , a ) v π ( s ′ ) = ∑ a π ( a ∣ s ) ∑ s ′ , r r p ( s ′ , r ∣ s , a ) , s ∈ S v_\pi(s)-\gamma\sum_{a}\sum_{s'}\pi(a|s)p(s'|s,a)v_\pi(s') =\sum_{a}\pi(a|s)\sum_{s',r}rp(s',r|s,a),s \in \mathcal{S} vπ(s)γasπ(as)p(ss,a)vπ(s)=aπ(as)s,rrp(s,rs,a),sS
得到标准形式后就可以调用相关函数直接求解。得到状态价值函数后,可以用状态价值表示动作价值的Bellan方程,
q π ( s , a ) = ∑ s ′ , r p ( s ′ , r ∣ s , a ) [ r + γ v π ( s ′ ) ] , s ∈ S , a ∈ A q_\pi(s,a) =\sum_{s',r}p(s',r|s,a)[r+\gamma v_\pi(s')],s \in \mathcal{S},a \in \mathcal{A} qπ(s,a)=s,rp(s,rs,a)[r+γvπ(s)],sS,aA

来求动作价值函数。

函数evaluate_bellman ()实现了上述功能。状态价值求解部分用np.linalg.solve()函数求解标准形式的线性方程组。得到状态价值后,直接计算得到动作价值.

环境的动力系统储存在env.P里,它是一个元组列表,每个元组包括概率、下一状态、奖励\回合结束指示这4个部分。

# 用Bellman方程求解状态价值和动作价值
def evaluate_bellman(env, policy, gamma=1.):
    a, b = np.eye(env.nS), np.zeros((env.nS))
    for state in range(env.nS - 1):
        for action in range(env.nA):
            pi = policy[state][action]
            # print('{}{}:pi:{}'.format(state, action, pi))
            # env.P中存储环境的动力
            for p, next_state, reward, done in env.P[state][action]:
                a[state, next_state] -= (pi * gamma)
                b[state] += (pi * reward * p)
    # np.linalg.solve()函数求解标准形式的线性方程组
    v = np.linalg.solve(a, b)
    q = np.zeros((env.nS, env.nA))
    # 利用状态价值函数求解动作价值函数
    for state in range(env.nS - 1):
        for action in range(env.nA):
            for p, next_state, reward, done in env.P[state][action]:
                q[state][action] += ((reward + gamma * v[next_state]) * p)
    return v, q

接下来使用evaluate_bellman()函数评估给出的策略。以下代码分别评估了一个随机策略和最优确定性策略,并输出了状态价值函数和动作价值函数。

评估随机策略,

policy = np.random.uniform(size=(env.nS, env.nA)) 
policy = policy / np.sum(policy, axis=1)[:, np.newaxis] 
state_values, action_values = evaluate_bellman(env, policy) 
print('状态价值 ={}'.format(state_values)) 
print('动作价值 ={}'.format(action_values))
状态价值 =[-134140.09187241 -134085.47213402 -133971.92139818 -133616.23666259
 -133194.20403738 -132781.56536103 -132435.88940662 -128380.91096168
 ...
 -133680.76568151 -130121.16858665 -115062.2396112        0.        ]
动作价值 =[[-1.34141092e+05 -1.34086472e+05 -1.34152621e+05 -1.34141092e+05]
 [-1.34086472e+05 -1.33972921e+05 -1.34152699e+05 -1.34141092e+05]
 ...
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00]]

评估最优策略,

optimal_state_values, optimal_action_values = evaluate_bellman(env, optimal_policy)
print('最优状态价值 = {}'.format(optimal_state_values))
print('最优动作价值 = {}'.format(optimal_action_values))
最优状态价值 = [[-14. -13. -12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.]
 [-13. -12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.  -2.]
 [-12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.  -2.  -1.]
 [-13. -12. -11. -10.  -9.  -8.  -7.  -6.  -5.  -4.  -3.   0.]]
最优动作价值 = [[ -15.  -14.  -14.  -15.]
 [ -14.  -13.  -13.  -15.]
 [ -13.  -12.  -12.  -14.]
 ...
 [   0.    0.    0.    0.]]

3.1.3 求解Bellman最优方程

对于悬崖寻路这样的数值环境,可以使用线性规划求解。线性规划问题的标准形式为:
minimise         ∑ s ∈ S v ( s ) over         v ( s ) , s ∈ S s.t.        v ( s ) − γ ∑ s ′ , r p ( s ′ , r ∣ s , a ) v ∗ ( s ′ ) ≥ ∑ s ′ , r r p ( s ′ , r ∣ s , a ) , s ∈ S \begin{aligned}\text{minimise} \ \ \ \ \ \ \ \ & \sum_{s \in \mathcal{S}}v(s) \\ \text{over} \ \ \ \ \ \ \ \ & v(s),s \in \mathcal{S}\\ \text{s.t.} \ \ \ \ \ \ \ &v(s)-\gamma\sum_{s',r}p(s',r|s,a)v_*(s')\geq\sum_{s',r}rp(s',r|s,a), s \in \mathcal{S} \end{aligned} minimise        over        s.t.       sSv(s)v(s),sSv(s)γs,rp(s,rs,a)v(s)s,rrp(s,rs,a),sS

其中目标函数中状态价值的系数已经被固定为1。也可以选择其他正实数作为系数。

以下代码使用scipy.optimize.linprog()函数来计算这个动态规划问题。

scipy.optimize.linprog(c,A_ub = None,b_ub = None,A_eq = None,b_eq = None,bounds = None,method = 'interior-point',callback = None,options = None,x0 = None)

min ⁡ x   c T x such that   A u b x ≤ b u b   A e q x = b e q   l ≤ x ≤ u \begin{aligned}\min_{x} \ & c^T x \\\text{such that} & \ A_{ub}x \leq b_{ub} \\ & \ A_{eq}x = b_{eq} \\& \ l \leq x \leq u \end{aligned} xmin such thatcTx Aubxbub Aeqx=beq lxu

其中,c是价值向量;A_ub和b_ub对应线性不等式约束;A_eq和b_eq对应线性等式约束;bounds对应公式中的lb和ub,决策向量的下界和上界;method是求解器的类型,如单纯形法.

官方帮助文档可查看此处.

这个函数的 第0个参数是目标函数中各决策变量在目标函数中的系数,本例中都取1;第1个和第2个 参数是形如“ A x ≤ b Ax≤b Axb”这样的不等式约束的A和b的值。函数optimal_bellman ()刚开始就 计算得到这些值。scipy.optimize.linprog()还有关键字参数bounds,指定决策变量是否为有界的。本例中,决策变量都是无界的。无界也要显式指定,不可以忽略。还有关键字参数 method确定优化方法。默认的优化方法不能处理不等式约束,这里选择了能够处理不等式约束的内点法(interior-point method)。

import scipy
def optimal_bellman(env, gamma=1.):
    p = np.zeros((env.nS, env.nA, env.nS))
    r = np.zeros((env.nS, env.nA))
    for state in range(env.nS - 1):
        for action in range(env.nA):
            for prob, next_state, reward, done in env.P[state][action]:
                p[state, action, next_state] += prob
                r[state, action] += (reward * prob)
    c = np.ones(env.nS)
    a_ub = gamma * p.reshape(-1, env.nS) - np.repeat(np.eye(env.nS), env.nA, axis=0)
    b_ub = -r.reshape(-1)
    a_eq = np.zeros((0, env.nS))
    b_eq = np.zeros(0)
    bounds = [(None, None), ] * env.nS
    res = scipy.optimize.linprog(c, a_ub, b_ub, bounds=bounds, method='interior-point')
    v = res.x
    q = r + gamma * np.dot(p, v)
    return v, q
    
optimal_state_values, optimal_action_values = optimal_bellman(env)
print('最优状态价值 ={}'.format(optimal_state_values))
print('最优动作价值 ={}'.format(optimal_action_values))

求得最优动作价值后,可以用argmax计算出最优确定策略。以下代码给出了最优策略,

optimal_actions = optimal_action_values.argmax(axis=1)
print('最优策略 ={}'.format(optimal_actions))

在pycharm IDE下的总程序为

import gym
import numpy as np
import scipy


# 单回合运行函数
def play_once(env, policy):
    total_reward = 0
    state = env.reset()
    while True:
        action = np.random.choice(env.nA, p=policy[state])
        next_state, reward, done, _ = env.step(action)
        # np.unravel_index函数的作用是获取一个/组int类型的索引值在一个多维数组中的位置。
        loc = np.unravel_index(state, env.shape)
        print('状态 = {}, 位置 ={}, 奖励 ={}'.format(state, loc, reward))
        total_reward += reward
        if done:
            break
        state = next_state
    return total_reward


# 用Bellman方程求解状态价值和动作价值
def evaluate_bellman(env, policy, gamma=1.):
    a, b = np.eye(env.nS), np.zeros((env.nS))
    for state in range(env.nS - 1):
        for action in range(env.nA):
            pi = policy[state][action]
            # print('{}{}:pi:{}'.format(state, action, pi))
            # env.P中存储环境的动力
            for p, next_state, reward, done in env.P[state][action]:
                a[state, next_state] -= (pi * gamma)
                b[state] += (pi * reward * p)
    # np.linalg.solve()函数求解标准形式的线性方程组
    v = np.linalg.solve(a, b)
    q = np.zeros((env.nS, env.nA))
    # 利用状态价值函数求解动作价值函数
    for state in range(env.nS - 1):
        for action in range(env.nA):
            for p, next_state, reward, done in env.P[state][action]:
                q[state][action] += ((reward + gamma * v[next_state]) * p)
    return v, q


# 使用动态规划求解最优策略
def optimal_bellman(env, gamma=1.):
    p = np.zeros((env.nS, env.nA, env.nS))
    r = np.zeros((env.nS, env.nA))
    for state in range(env.nS - 1):
        for action in range(env.nA):
            for prob, next_state, reward, done in env.P[state][action]:
                p[state, action, next_state] += prob
                r[state, action] += (reward * prob)
    c = np.ones(env.nS)
    a_ub = gamma * p.reshape(-1, env.nS) - np.repeat(np.eye(env.nS), env.nA, axis=0)
    b_ub = -r.reshape(-1)
    a_eq = np.zeros((0, env.nS))
    b_eq = np.zeros(0)
    bounds = [(None, None), ] * env.nS
    res = scipy.optimize.linprog(c, a_ub, b_ub, bounds=bounds, method='interior-point')
    v = res.x
    q = r + gamma * np.dot(p, v)
    return v, q


# 主函数
if __name__ == '__main__':
    env = gym.make('CliffWalking-v0')
    print('观察空间 = {}'.format(env.observation_space))
    print('动作空间 = {}'.format(env.action_space))
    print('状态数量 = {}, 动作数量= {}'.format(env.nS, env.nA))
    print('地图大小 = {}'.format(env.shape))

    ''' 
    # 定义最优策略
    actions = np.ones(env.shape, dtype=int)
    actions[-1, :] = 0
    actions[:, -1] = 2
    optimal_policy = np.eye(4)[actions.reshape(-1)]
    

    total_reward = play_once(env, optimal_policy)
    print('总奖励 ={}'.format(total_reward))
    
    optimal_state_values, optimal_action_values = evaluate_bellman(env, optimal_policy)
    print('最优状态价值 = {}'.format(optimal_state_values.reshape(4, -1)))
    print('最优动作价值 = {}'.format(optimal_action_values))
    '''

    '''
    # 定义随机策略
    policy = np.random.uniform(size=(env.nS, env.nA))
    # 归一化
    policy = policy / np.sum(policy, axis=1)[:, np.newaxis]
    state_values, action_values = evaluate_bellman(env, policy)
    print('状态价值 ={}'.format(state_values))
    print('动作价值 ={}'.format(action_values))
    '''

    # 求解最优策略
    optimal_state_values, optimal_action_values = optimal_bellman(env)
    print('最优状态价值 ={}'.format(optimal_state_values))
    print('最优动作价值 ={}'.format(optimal_action_values))

    optimal_actions = optimal_action_values.argmax(axis=1)
    print('最优策略 ={}'.format(optimal_actions))

输出为,

观察空间 = Discrete(48)
动作空间 = Discrete(4)
状态数量 = 48, 动作数量= 4
地图大小 = (4, 12)
最优状态价值 =[-1.40000000e+01 -1.30000000e+01 -1.20000000e+01 -1.10000000e+01
 -1.00000000e+01 -9.00000000e+00 -8.00000000e+00 -7.00000000e+00
 -6.00000000e+00 -5.00000000e+00 -4.00000000e+00 -3.00000000e+00
 -1.30000000e+01 -1.20000000e+01 -1.10000000e+01 -1.00000000e+01
 -9.00000000e+00 -8.00000000e+00 -7.00000000e+00 -6.00000000e+00
 -5.00000000e+00 -4.00000000e+00 -3.00000000e+00 -2.00000000e+00
 -1.20000000e+01 -1.10000000e+01 -1.00000000e+01 -9.00000000e+00
 -8.00000000e+00 -7.00000000e+00 -6.00000000e+00 -5.00000000e+00
 -4.00000000e+00 -3.00000000e+00 -2.00000000e+00 -1.00000000e+00
 -1.30000000e+01 -1.20000000e+01 -1.10000000e+01 -1.00000000e+01
 -9.00000000e+00 -8.00000000e+00 -7.00000000e+00 -6.00000000e+00
 -5.00000000e+00 -4.00000000e+00 -9.99999999e-01  1.82291993e-11]
最优动作价值 =[[ -14.99999999  -13.99999999  -13.99999999  -14.99999999]
 [ -13.99999999  -13.          -13.          -14.99999999]
 ...
 [   0.            0.            0.            0.        ]]
最优策略 =[2 1 1 1 1 1 1 1 1 1 1 2
1 1 1 1 1 1 1 1 1 1 1 2 
1 1 1 1 1 1 1 1 1 1 1 2 
0 0 0 0 0 0 0 0 0 0 1 0]

4. 动态规划

4.1 案例:冰面滑行

冰面滑行问题(FrozenLake-v0)是扩展库Gym里内置的一个文本环境任务。该问题的背景如下:在一个大小为4x4的湖面上,有些地方结冰了,有些地方没有结冰。我们可以用一个4x4的字符矩阵来表示湖面的情况,

SFFF
FHFH
FFFH
HFFG

其中字母“F”(Frozen)表示结冰的区域,字母“H”(Hole)表示未结冰的冰窟窿,字母“S”(Start)和字母“G”(Goal)分别表示移动任务的起点和目标。在这个湖面上要执行以下的移动任务:要从“S”处移动到“G”处。每一次移动,可以选择“左”、“下”、“右”、“上”4个方向之一进行移动,每次移动一格。如果移动到G处,则回合结束,获得1个单位的奖励;如果移动到“H”处,则回合结束,没有获得奖励;如果移动到其它字母,暂不获得奖励,可以继续。由于冰面滑,所以实际移动的方向和想要移动的方向并不一定完全一致。例如,如果在某个地方想要左移,但是由于冰面滑,实际可能下移、右移或者上移。任务的目标是尽可能达到“G”处,以获得奖励。

4.1.1 实验环境使用

首先我们来看如何使用扩展库Gym中的环境。

用下列语句引入环境对象:

import gym 
env = gym.make('FrozenLake-v0')
env = env.unwrapped

这个环境的状态空间有16个不同的状态 { s 1 , s 2 , . . . , s 15 } \{s_1, s_2,...,s_{15}\} {s1,s2,...,s15},表示当前处在哪一个位置;动作空间有4个不同的动作 { a 0 , a 1 , a 2 , a 3 } \{a_0, a_1, a_2, a_3\} {a0,a1,a2,a3},分别表示上、下、左、右四个方向。在扩展库Gym中,直接用int型数值来表示这些状态和动作。下列代码可以查看环境的状态空间和动作空间,

print(env.observation_space)
print(env.action_space)

这个环境的动力系统储存在env.P里。可以用下列方法查看某个状态(如状态14)某个动作(如右移)情况下的动力,

env.unwrapped.P[14][2]

它是一个元组列表,每个元组包括概率、下一状态、奖励和回合结束指示这4个部分。例如,env.P[14][2]返回元组列表[(0.333333333333333,14, 0.0, False),(0.333333333333333, 15, 1.0 True), (0.333333333333333333, 10, 0.0, False)]这表明:
p ( s 14 , 0 ∣ s 14 , a 2 ) = 1 3 p(s_{14}, 0|s_{14},a_2) = \frac{1}{3} p(s14,0s14,a2)=31
p ( s 15 , 1 ∣ s 14 , a 2 ) = 1 3 p(s_{15}, 1|s_{14},a_2) = \frac{1}{3} p(s15,1s14,a2)=31
p ( s 10 , 0 ∣ s 14 , a 2 ) = 1 3 p(s_{10}, 0|s_{14},a_2) = \frac{1}{3} p(s10,0s14,a2)=31
接下来我们来看怎么使用环境。像之前介绍的,要使用env.reset()和env.step来执行。执行一个回合的代码如下所示,其中的play_policy()函数接受参数policy,这是一个16x4的np.array对象,表示策略 π \pi π

play_policy()函数返回一个浮点数,表示本回合的奖励。

# 使用策略执行一个回合
def play_policy(env, policy, render=False):
    total_reward = 0.
    observation = env.reset()
    while True:
        if render:
            env.render()  # 此行可显示
        action = np.random.choice(env.action_space.n, p=policy[observation])
        observation, reward, done, _ = env.step(action)
        total_reward += reward   # 统计回合奖励
        if done:  # 游戏结束
            break
    return total_reward

接下来用刚刚定义的play_policy()函数来看看随机策略的性能。下面代码够早了随机策略random_policy,它对于任意的 s ∈ S , a ∈ A s \in \mathcal{S}, a \in \mathcal{A} sS,aA均有 π ( s , a ) = 1 ∣ A ∣ \pi(s, a) = \frac{1}{|\mathcal{A}|} π(s,a)=A1.运行下列代码,可以求得随机策略获得奖励的期望值。一般情况下的结果基本为0,这意味着随机策略几乎不可能成功到达目的地。

# 随机策略
    random_policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nA

    episode_rewards = [play_policy(env, random_policy) for _ in range(100)]
    print("随机策略 平均奖励 :{}".format(np.mean(episode_rewards)))
4.1.2 有模型策略迭代求解

这个部分实现策略评估、策略提升和策略迭代。

首先来看策略评估。以下代码给出了策略评估的代码。它首先定义了函数v2q(),这个函数可以根据状态价值函数计算含有某个状态的动作价值函数。利用者函数evaluate_policy()函数迭代结算了给定策略policy的状态价值。这个函数使用theta作为精度控制的参数。之后的代码测试了evaluate_policy()函数。它首先求得了随机策略的状态价值函数,然后利用v2q求得动作价值函数。

# 策略评估的实现
def v2q(env, v, s=None, gamma=1.):
    if s is not None:
        q = np.zeros(env.unwrapped.nA)
        for a in range(env.unwrapped.nA):
            for prob, next_state, reward, done in env.unwrapped.P[s][a]:
                q[a] += prob * (reward + gamma * v[next_state] * (1. - done))
    else:
        q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))
        for s in range(env.unwrapped.nS):
            q[s] = v2q(env, v, s, gamma)
    return q


# 定义策略评估函数
def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):
    v = np.zeros(env.unwrapped.nS)
    while True:
        delta = 0
        for s in range(env.unwrapped.nS):
            vs = sum(policy[s] * v2q(env, v, s, gamma))
            delta = max(delta, abs(v[s]-vs))
            v[s] = vs
        if delta < tolerant:
            break
    return v
# 对随机策略进行策略评估
print('状态价值函数:')
v_random = evaluate_policy(env, random_policy)
print(v_random.reshape(4, 4))

print('动作价值函数:')
q_random = v2q(env, v_random)
print(q_random)

接下来看看策略改进。improve_policy()函数实现了策略改进算法。输入的策略是policy,改进后的策略直接覆盖原有的policy。该函数返回一个bool类型的值,表示输入的策略是不是最优策略。之后的代码测试了improve_policy()函数,它对随机策略进行改进,得到了一个确定性策略。

# 策略改进的实现
def improve_policy(env, v, policy, gamma=1.):
    optimal = True
    for s in range(env.unwrappped.nS):
        q = v2q(env, v, s, gamma)
        a = np.argmax(q)
        if policy[s][a] != 1:
            optimal = False
            policy[s] = 0.
            policy[s][a] = 1.
    return optimal
# 对随机策略进行策略改进
policy = random_policy.copy()
optimal = improve_policy(env, v_random, policy)
if optimal:
	print('无更新, 最优策略为:')
else:
	print('有更新,最优策略为:')
print(policy)

实现了策略评估和策略改进后,我们就可以实现策略迭代。iterate_policy()函数实现了策略迭代算法。之后的代码对iterate_policy()进行测试。针对冰面滑性问题,该代码求得了最优策略,并进行了测试。

# 策略迭代的实现
def iterate_policy(env, gamma=1., tolerant=1e-6:
	policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.wrapped.nA
	while True:
		v = evaluate_policy(env, policy, gamma, tolerant)
		if improve_policy(env, v, policy):
			break
	return policy, v
# 利用策略迭代求解最优策略
policy_pi, v_pi = iterate_policy(env)
print('状态价值函数 = ')
print(v_pi.reshape(4, 4)
print('最优策略 = ')
print(np.argmax(policy_pi, axis=1).reshape(4, 4))
4.1.3 有模型价值迭代求解

现在我们用价值迭代算法求解冰面滑行问题的最优策略。以下代码中iterate_value()函数实现了价值迭代算法。这个函数使用参数tolerant来控制价值迭代的精度。之后的代码在冰面滑行问题上测试了iterate_value()函数。

# 价值迭代的实现
def iterate_value(env, gamma=1, tolerant=1e-6):
    v = np.zeros(env.unwrapped.nS)
    while True:
        delta = 0
        for s in range(env.unwrapped.nS):
            vmax = max(v2q(env, v, s, gamma))
            delta = max(delta, abs(v[s]-vmax))
            v[s] = vmax
            if delta < tolerant:
                break

    policy = np.zeros((env.unwrapped.nS, env.unwrapped.nA))
    for s in range(env.unwrapped.nS):
        a = np.argmax(v2q(env, v, s, gamma))
        policy[s][a] = 1.
    return policy, v
    # 利用策略迭代求解最优策略
    policy_pi, v_pi = iterate_policy(env)
    print('状态价值函数 = ')
    print(v_pi.reshape(4, 4))
    print('最优策略 = ')
    print(np.argmax(policy_pi, axis=1).reshape(4, 4))
    print('价值迭代 平均奖励:{}'.format(play_policy(env, policy_pi)))

策略迭代和价值迭代得到的最优价值函数和最优策略应该是一致的。最优状态价值函数为:
[ 0.8235 0.8235   0.8235 0.8235 0.8235 0 0.5294 0 0.8235 0.8235 0.7647 0 0 0.8824 0.9412 0 ] \begin{bmatrix}0.8235 &0.8235 &0.8235& 0.8235\\ 0.8235&0&0.5294&0\\ 0.8235&0.8235&0.7647& 0\\ 0&0.8824& 0.9412&0\end{bmatrix} 0.82350.82350.823500.8235 00.82350.88240.82350.52940.76470.94120.8235000

最优策略为:
[ 0 3 3 3 0 0 0 0 3 1 0 0 0 2 1 0 ] \begin{bmatrix}0&3&3&3\\ 0&0&0&0\\ 3&1&0&0\\ 0&2&1&0\end{bmatrix} 0030301230013000

在pycharm中可以运行的总程序如下,

import gym
import numpy as np


# 使用策略执行一个回合
def play_policy(env, policy, render=False):
    total_reward = 0.
    observation = env.reset()
    while True:
        if render:
            env.render()  # 此行可显示
        action = np.random.choice(env.action_space.n, p=policy[observation])
        observation, reward, done, _ = env.step(action)
        total_reward += reward   # 统计回合奖励
        if done:  # 游戏结束
            break
    return total_reward


# 策略评估的实现
def v2q(env, v, s=None, gamma=1.):
    if s is not None:
        q = np.zeros(env.unwrapped.nA)
        for a in range(env.unwrapped.nA):
            for prob, next_state, reward, done in env.unwrapped.P[s][a]:
                q[a] += prob * (reward + gamma * v[next_state] * (1. - done))
    else:
        q = np.zeros((env.unwrapped.nS, env.unwrapped.nA))
        for s in range(env.unwrapped.nS):
            q[s] = v2q(env, v, s, gamma)
    return q


# 定义策略评估函数
def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):
    v = np.zeros(env.unwrapped.nS)
    while True:
        delta = 0
        for s in range(env.unwrapped.nS):
            vs = sum(policy[s] * v2q(env, v, s, gamma))
            delta = max(delta, abs(v[s]-vs))
            v[s] = vs
        if delta < tolerant:
            break
    return v


# 策略改进
def improve_policy(env, v, policy, gamma=1.):
    optimal = True
    for s in range(env.unwrapped.nS):
        q = v2q(env, v, s, gamma)
        a = np.argmax(q)
        if policy[s][a] != 1:
            optimal = False
            policy[s] = 0.
            policy[s][a] = 1.
    return optimal


# 策略迭代的实现
def iterate_policy(env, gamma=1., tolerant=1e-6):
    policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nA
    while True:
        v = evaluate_policy(env, policy, gamma, tolerant)
        if improve_policy(env, v, policy):
            break
    return policy, v


# 价值迭代的实现
def iterate_value(env, gamma=1, tolerant=1e-6):
    v = np.zeros(env.unwrapped.nS)
    while True:
        delta = 0
        for s in range(env.unwrapped.nS):
            vmax = max(v2q(env, v, s, gamma))
            delta = max(delta, abs(v[s]-vmax))
            v[s] = vmax
            if delta < tolerant:
                break

    policy = np.zeros((env.unwrapped.nS, env.unwrapped.nA))
    for s in range(env.unwrapped.nS):
        a = np.argmax(v2q(env, v, s, gamma))
        policy[s][a] = 1.
    return policy, v


if __name__ == '__main__':
    env = gym.make('FrozenLake-v0')
    env = env.unwrapped

    random_policy = np.ones((env.unwrapped.nS, env.unwrapped.nA)) / env.unwrapped.nA

    episode_rewards = [play_policy(env, random_policy) for _ in range(100)]
    print("随机策略 平均奖励 :{}".format(np.mean(episode_rewards)))

    print('状态价值函数:')
    v_random = evaluate_policy(env, random_policy)
    print(v_random.reshape(4, 4))

    print('动作价值函数:')
    q_random = v2q(env, v_random)
    print(q_random)

    policy = random_policy.copy()
    optimal = improve_policy(env, v_random, policy)
    if optimal:
        print('无更新, 最优策略为:')
    else:
        print('有更新,最优策略为:')
    print(policy)

    # 利用策略迭代求解最优策略
    policy_pi, v_pi = iterate_policy(env)
    print('状态价值函数 = ')
    print(v_pi.reshape(4, 4))
    print('最优策略 = ')
    print(np.argmax(policy_pi, axis=1).reshape(4, 4))
    print('价值迭代 平均奖励:{}'.format(play_policy(env, policy_pi)))
随机策略 平均奖励 :0.04
状态价值函数:
[[0.0139372  0.01162942 0.02095187 0.01047569]
 [0.01624741 0.         0.04075119 0.        ]
 [0.03480561 0.08816967 0.14205297 0.        ]
 [0.         0.17582021 0.43929104 0.        ]]
动作价值函数:
[[0.01470727 0.01393801 0.01393801 0.01316794]
 [0.00852221 0.01162969 0.01086043 0.01550616]
 [0.02444416 0.0209521  0.02405958 0.01435233]
 [0.01047585 0.01047585 0.00698379 0.01396775]
 [0.02166341 0.01701767 0.0162476  0.01006154]
 [0.         0.         0.         0.        ]
 [0.05433495 0.04735099 0.05433495 0.00698396]
 [0.         0.         0.         0.        ]
 [0.01701767 0.04099176 0.03480569 0.04640756]
 [0.0702086  0.11755959 0.10595772 0.05895286]
 [0.18940397 0.17582024 0.16001408 0.04297362]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.08799662 0.20503708 0.23442697 0.17582024]
 [0.25238807 0.53837042 0.52711467 0.43929106]
 [0.         0.         0.         0.        ]]
有更新,最优策略为:
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
状态价值函数 = 
[[0.82351246 0.82350689 0.82350303 0.82350106]
 [0.82351416 0.         0.5294002  0.        ]
 [0.82351683 0.82352026 0.76469786 0.        ]
 [0.         0.88234658 0.94117323 0.        ]]
最优策略 = 
[[0 3 3 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]
价值迭代 平均奖励:1.0
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐