强化学习的数学原理 —— 蒙特卡罗(Monte Carlo) MC Basic 算法推导与 Python 从零实现【附源码】
一、前言
前面两篇文章【强化学习的数学原理 —— 值迭代(Value Iteration)算法推导与 Python 从零实现【附源码】】&【强化学习的数学原理 —— 策略迭代(Policy Iteration)算法推导与 Python 从零实现【附源码】】介绍了基于系统模型求解最优策略的两种算法。本篇博文将继续基于赵世钰老师《强化学习的数学原理》课程的网格世界例子,介绍第一种 无需模型(model free) 的 蒙特卡罗(Monte Carlo,MC) 强化学习算法 —— MC Basic 算法,并从零完成该算法的 Python 实现。
二、问题建模
为了方便读者阅读,本节将再次介绍问题有关背景。如已知悉,可跳过本节。
2.1 问题背景
在如图 2.1 所示的 网格世界(grid world) 中,智能体(agent) 需要从一个初始区域出发,最终到达目标区域。其中,白色单元格代表可以进入的区域,黄色单元格代表禁止进入的区域,蓝色单元格代表目标区域。
图 2.1 5×5 网格世界:出自赵世钰《强化学习的数学原理》
2.2 状态和动作空间
状态对应智能体所在单元格的位置,如图 2.1 中共有从左上到右下 25 个状态,表示为状态空间(state space) S = { s 1 , s 2 , … , s 25 } \mathcal{S} = \{s_1,s_2,\dots , s_{25}\} S={s1,s2,…,s25}。
在网格世界中,智能体在每一个状态有 5 个可选的动作:向上移动、向右移动、向下移动、向左移动、保持不动。所有动作的集合被称为动作空间(action space) A = { a 1 , a 2 , … , a 5 } \mathcal{A} = \{a_1,a_2,\dots , a_5\} A={a1,a2,…,a5},具体如图 2.2 所示。
图 2.2 动作空间:出自赵世钰《强化学习的数学原理》原图 1.3 (b)
2.3 奖励
在一个状态执行一个动作后,智能体会得到奖励(reward) ,可以写成 r ( s , a ) r(s,a) r(s,a)。一般来说,正的奖励表示我们鼓励智能体采取相应的动作;负的奖励表示我们不鼓励智能体采取该动作。在图 2.1 的网格世界中,我们设置如下奖励:
- 如果智能体越过四周边界,设 r b o u n d a r y = − 1 r_{boundary}=-1 rboundary=−1;
- 如果智能体试图进入禁止区域,设 r f r o b i d d e n = − 10 r_{frobidden}=-10 rfrobidden=−10;
- 如果智能体到达了目标区域,设 r t a r g e t = + 1 r_{target}=+1 rtarget=+1;
- 在其他情况下,智能体获得的奖励为 r o t h e r = 0 r_{other}=0 rother=0。
三、MC Basic算法
3.1 无需模型的转换
蒙特卡罗方法是以策略迭代算法为基础的,因此也分为 “策略评估” 和 “策略改进” 两个步骤。其中,策略改进步骤的元素展开形式是
π k + 1 ( s ) = arg max π ∑ a π ( a ∣ s ) ( ∑ r p ( r ∣ s , a ) r + γ ∑ s ′ p ( s ′ ∣ s , a ) v π k ( s ′ ) ) = arg max π ∑ a π ( a ∣ s ) q π k ( s , a ) , s ∈ S \begin{align*} \pi_{k+1}(s) &= \arg \max_{\pi} \sum_{a} \pi(a|s) {\left( \sum_{r} p(r|s,a)r + \gamma \sum_{s'} p(s'|s,a)v_{\pi_k}(s') \right)}\\ &= \arg \max_{\pi} \sum_a \pi(a|s)q_{\pi_k}(s,a), \quad s \in \mathcal{S} \end{align*} πk+1(s)=argπmaxa∑π(a∣s)(r∑p(r∣s,a)r+γs′∑p(s′∣s,a)vπk(s′))=argπmaxa∑π(a∣s)qπk(s,a),s∈S
无需模型的方法,即不需要知道模型 { p ( r ∣ s , a ) , p ( s ′ ∣ s , a ) } \{p(r|s,a),p(s'|s,a)\} {p(r∣s,a),p(s′∣s,a)} ,通过蒙特卡罗方法用数据来直接估计 q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a) 。具体做法是,从 ( s , a ) (s,a) (s,a) 开始,智能体可以执行策略 π k \pi_k πk ,进而获得 n n n 个回合,假设第 i i i 个回合的回报是 g π k ( i ) ( s , a ) g_{\pi_k}^{(i)}(s,a) gπk(i)(s,a) 。则这些回合的回报的平均值可以用来近似 q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a) ,即
q π k ( s , a ) = E [ G t ∣ S t = s , A t = a ] ≈ 1 n ∑ i = 1 n g π k ( i ) ( s , a ) q_{\pi_k}(s,a)=\mathbb{E} [G_t|S_t=s,A_t=a]\approx \frac{1}{n}\sum_{i=1}^ng_{\pi_k}^{(i)}(s,a) qπk(s,a)=E[Gt∣St=s,At=a]≈n1i=1∑ngπk(i)(s,a)
根据大数定律,如果 n n n 足够大,上式的近似将会足够精确。
3.2 MC Basic 算法
从初始策略 π 0 \pi_0 π0 开始,该算法在第 k k k 次迭代( k = 0 , 1 , 2 , … k=0,1,2,\dots k=0,1,2,…)中有两个步骤。
- 步骤 1:策略评估。这一步用于估算所有 ( s , a ) (s,a) (s,a) 的 q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a)。具体来说,对于每个 ( s , a ) (s,a) (s,a),收集足够多的回合进而求其回报的平均值(记作 q k ( s , a ) q_k(s,a) qk(s,a))来近似 q π k ( s , a ) q_{\pi_k}(s,a) qπk(s,a)。
- 步骤 2:策略改进。这一步通过 π k + 1 ( s ) = arg max π ∑ a π ( a ∣ s ) q k ( s , a ) \pi_{k+1}(s)=\arg \max_{\pi}\sum_a \pi(a|s)q_k(s,a) πk+1(s)=argmaxπ∑aπ(a∣s)qk(s,a) 得到所有 s ∈ S s \in \mathcal{S} s∈S 的新策略,即 π k + 1 ( a k ∗ ∣ s ) = 1 \pi_{k+1}(a_k^*|s)=1 πk+1(ak∗∣s)=1,其中 a k ∗ = arg max a q k ( s , a ) a_k^*=\arg \max_aq_k(s,a) ak∗=argmaxaqk(s,a)。
3.3 伪代码
总结上述数学原理,该算法的伪代码如图 3.1 所示:
图 3.1 策略迭代伪代码:出自赵世钰《强化学习的数学原理》原算法 5.1
四、Python 代码实现
整套代码开发思路分成:环境定义、交互逻辑、可视化、策略迭代算法四部分,本篇挑选决定算法逻辑的四个核心函数细说,绘图工具类代码不再拆分讲解,最后附上完整源码方便读者一键运行。
4.1 get_reward 函数
get_reward 用来判定当前格子的即时奖励,对照奖励规则:到达目标 + 1、踏入禁区 - 10、普通格子收益 0。
def get_reward(state):
if state == TARGET_STATE:
return R_TARGET
if state in FORBIDDEN_STATES:
return R_FORBIDDEN
return R_NORMAL
4.2 step 函数
step 是智能体执行动作后的状态转移规则,整个网格世界使用 5×5 矩阵表示,i 为行,j 为列。
def step(state, action):
i, j = state
di, dj = action
ni, nj = i + di, j + dj
if ni < 0 or ni >= GRID_SIZE or nj < 0 or nj >= GRID_SIZE:
return (i, j), R_BOUNDARY
next_state = (ni, nj)
return next_state, get_reward(next_state)
- 向网格外侧移动算作越界,原地停留并扣除边界惩罚 -1;
- 在网格内正常移动,落地后调用
get_reward获取对应奖励。
4.3 sample_episode 函数
sample_episod 函数是蒙特卡罗算法中最核心的轨迹采样与回报计算工具,用于评估某个初始状态 - 动作对 ( s , a ) (s,a) (s,a) 的好坏。它的作用是从指定起点 start_s、指定初始动作 start_a 出发,按照给定策略 policy 走一条完整轨迹(episode),并返回这条轨迹从起点到终点的最终累计回报 G。
# 采样轨迹
def sample_episode(self, start_s, start_a, policy, max_step=100):
episode = []
s = start_s
a = start_a
for _ in range(max_step):
ns, r = self.env.step(s, a)
episode.append((s, a, r))
a = policy[ns[0], ns[1]]
s = ns
if s == self.env.TARGET_STATE:
break
G = 0
ret_list = []
for _, _, r in reversed(episode):
G = r + self.env.GAMMA * G
ret_list.append(G)
ret_list.reverse()
return ret_list[0]
读者可以更改
max_step的值来限制每回合的长度,探究回合长度对 Monte Carlo 算法收敛性的影响。
4.4 蒙特卡罗训练主循环
train 函数是基于蒙特卡洛采样的策略迭代算法的核心训练函数。它会自动交替执行「策略评估」和「策略改进」,直到策略收敛到最优。
该函数的整体流程概括如下:用 MC 采样估计 Q 值 → 更新价值函数 V → 改进策略 → 重复直到收敛。
def train(self, save_img=False):
iter_cnt = 0
while True:
V_old = self.V.copy()
Q = np.zeros((self.env.GRID_SIZE, self.env.GRID_SIZE, self.env.n_actions))
# 策略评估
for i in range(self.env.GRID_SIZE):
for j in range(self.env.GRID_SIZE):
for a in range(self.env.n_actions):
total = 0.0
for _ in range(self.EPISODE_PER_SA):
total += self.sample_episode((i, j), a, self.policy)
Q[i, j, a] = total / self.EPISODE_PER_SA
# 价值更新
self.V = np.max(Q, axis=-1)
delta = np.max(np.abs(self.V - V_old))
# 策略改进:平局时随机选择一个最优动作
old_policy = self.policy.copy()
for i in range(self.env.GRID_SIZE):
for j in range(self.env.GRID_SIZE):
q_vals = Q[i, j, :]
max_q = np.max(q_vals)
best_actions = np.where(q_vals == max_q)[0]
self.policy[i, j] = np.random.choice(best_actions)
iter_cnt += 1
if delta < self.env.THETA:
print(f"✅ 收敛!迭代次数:{iter_cnt}")
break
return self.V, old_policy, Q
4.4 完整源码
import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle, Circle
# ===================== 环境类:地图 + 绘图 =====================
class GridWorld:
def __init__(self):
# 基础配置
self.GRID_SIZE = 5
self.GAMMA = 0.9
self.THETA = 1e-6
# 奖励
self.R_TARGET = 1
self.R_FORBIDDEN = -10
self.R_BOUNDARY = -1
self.R_NORMAL = 0
# 动作:原地、上、右、下、左
self.ACTIONS = [(0, 0), (-1, 0), (0, 1), (1, 0), (0, -1)]
self.n_actions = len(self.ACTIONS)
# 地图
self.FORBIDDEN_STATES = [(1, 1), (1, 2), (2, 2), (3, 1), (3, 3), (4, 1)]
self.TARGET_STATE = (3, 2)
# 奖励函数
def get_reward(self, state):
if state == self.TARGET_STATE:
return self.R_TARGET
if state in self.FORBIDDEN_STATES:
return self.R_FORBIDDEN
return self.R_NORMAL
# 环境步进
def step(self, state, action_idx):
i, j = state
di, dj = self.ACTIONS[action_idx]
ni, nj = i + di, j + dj
if 0 <= ni < self.GRID_SIZE and 0 <= nj < self.GRID_SIZE:
next_state = (ni, nj)
else:
return (i, j), self.R_BOUNDARY
return next_state, self.get_reward(next_state)
# ===================== 绘图 =====================
def draw(self, V, policy, iter_num, save_flag=False):
plt.figure(figsize=(12, 5))
ax1 = plt.subplot(1, 2, 1)
self.plot_policy(policy, ax1)
ax2 = plt.subplot(1, 2, 2)
self.plot_value(V, ax2)
plt.suptitle(f'Monte Carlo Iter: {iter_num}', fontsize=14)
plt.tight_layout()
if save_flag:
save_dir = "Monte Carlo/Basic/iter_img"
os.makedirs(save_dir, exist_ok=True) # 自动创建文件夹
plt.savefig(f"{save_dir}/iter_{iter_num:04d}.png", dpi=150)
plt.close()
else:
plt.show()
def plot_policy(self, policy, ax):
ax.set_xlim(0, self.GRID_SIZE)
ax.set_ylim(0, self.GRID_SIZE)
ax.invert_yaxis()
ax.set_xticks(range(self.GRID_SIZE))
ax.set_yticks(range(self.GRID_SIZE))
ax.grid(True, color='black', linewidth=1)
ax.set_title('Optimal Policy (MC)', fontsize=12)
for i in range(self.GRID_SIZE):
for j in range(self.GRID_SIZE):
rect = Rectangle((j, i), 1, 1, ec='black', lw=1)
ax.add_patch(rect)
if (i, j) in self.FORBIDDEN_STATES:
rect.set_facecolor('gold')
elif (i, j) == self.TARGET_STATE:
rect.set_facecolor('cornflowerblue')
else:
rect.set_facecolor('white')
arrow_scale = 0.3
circle_r = 0.15
for i in range(self.GRID_SIZE):
for j in range(self.GRID_SIZE):
act = policy[i, j]
xc, yc = j + 0.5, i + 0.5
dx, dy = 0, 0
if act == 1:
dy = -arrow_scale
elif act == 2:
dx = arrow_scale
elif act == 3:
dy = arrow_scale
elif act == 4:
dx = -arrow_scale
elif act == 0:
ax.add_patch(Circle((xc, yc), circle_r, color='red', fill=False, lw=2))
continue
ax.arrow(xc, yc, dx, dy, head_width=0.12, head_length=0.12, fc='red', ec='red')
def plot_value(self, V, ax):
im = ax.imshow(V, cmap='coolwarm', origin='upper')
ax.set_xticks(range(self.GRID_SIZE))
ax.set_yticks(range(self.GRID_SIZE))
ax.set_title('State Value V(s)', fontsize=12)
for i in range(self.GRID_SIZE):
for j in range(self.GRID_SIZE):
ax.text(j, i, f'{V[i, j]:.2f}', ha='center', va='center', fontweight='bold')
plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
# ===================== 算法类:蒙特卡洛策略迭代 =====================
class MonteCarlo_Basic:
def __init__(self, env):
self.env = env
self.policy = np.random.randint(0, self.env.n_actions, size=(env.GRID_SIZE, env.GRID_SIZE))
self.V = np.zeros((env.GRID_SIZE, env.GRID_SIZE))
self.EPISODE_PER_SA = 30
# 采样轨迹
def sample_episode(self, start_s, start_a, policy, max_step=100):
episode = []
s = start_s
a = start_a
for _ in range(max_step):
ns, r = self.env.step(s, a)
episode.append((s, a, r))
a = policy[ns[0], ns[1]]
s = ns
if s == self.env.TARGET_STATE:
break
G = 0
ret_list = []
for _, _, r in reversed(episode):
G = r + self.env.GAMMA * G
ret_list.append(G)
ret_list.reverse()
return ret_list[0]
# 训练主循环
def train(self, save_img=False):
iter_cnt = 0
while True:
V_old = self.V.copy()
Q = np.zeros((self.env.GRID_SIZE, self.env.GRID_SIZE, self.env.n_actions))
# 策略评估
for i in range(self.env.GRID_SIZE):
for j in range(self.env.GRID_SIZE):
for a in range(self.env.n_actions):
total = 0.0
for _ in range(self.EPISODE_PER_SA):
total += self.sample_episode((i, j), a, self.policy)
Q[i, j, a] = total / self.EPISODE_PER_SA
# 价值更新
self.V = np.max(Q, axis=-1)
delta = np.max(np.abs(self.V - V_old))
# 绘图(环境类负责)
if save_img:
self.env.draw(self.V, self.policy, iter_cnt, save_flag=True)
# 策略改进:平局时随机选择一个最优动作
old_policy = self.policy.copy()
for i in range(self.env.GRID_SIZE):
for j in range(self.env.GRID_SIZE):
q_vals = Q[i, j, :]
max_q = np.max(q_vals)
best_actions = np.where(q_vals == max_q)[0]
self.policy[i, j] = np.random.choice(best_actions)
iter_cnt += 1
if delta < self.env.THETA:
print(f"✅ 收敛!迭代次数:{iter_cnt}")
break
if not save_img:
self.env.draw(self.V, old_policy, iter_cnt - 1, save_flag=False)
return self.V, old_policy, Q
# ===================== 主程序 =====================
if __name__ == '__main__':
env = GridWorld()
mc = MonteCarlo_Basic(env)
V_opt, policy_opt, Q_opt = mc.train(save_img=True)
另外,代码增加可视化控制:
save_img = True自动保存每一轮迭代图片到\Monte Carlo\Basic\iter_img目录下,方便观察策略与价值的收敛变化;save_img = False仅在算法收敛后弹出最终效果图,适合日常调试运行。
五、运行结果
初始状态,随机生成所有格子的初始策略,根据初始策略计算 V π 0 V_{\pi_0} Vπ0 ,策略及状态值如图 5.1 所示。
图 5.1 左:策略图;右:状态值图
第 1 轮迭代后,得到图 5.2 结果。可以看到最靠近目标区域的格子策略得到了很好的改善,但其它区域内的策略还是随机生成的,并且它们的 state value 也不是很好。
图 5.2 左:策略图;右:状态值图
第 2 轮迭代后,得到图 5.3 结果。可以看到最后一行第四列的格子,策略不再是随机生成的,而是变成了向左移动,策略得到提升。
图 5.3 左:策略图;右:状态值图
按照上述过程,策略逐步提升。最终,经历 12 次迭代收敛后,得到最优策略及状态值热力图如图 5.4 所示。
图 5.4 左:策略图;右:状态值图
六、其他说明
本篇博文重在代码实际实现,对于强化学习的基本概念和数学公式推导(如马尔可夫决策、贝尔曼方程等),还需读者自行查阅相关资料。这里附上赵世钰老师的b站讲解视频链接及Github,供读者参考。
- http://【【强化学习的数学原理】课程:从零开始到透彻理解(完结)】https://www.bilibili.com/video/BV1sd4y167NS?vd_source=f84a75bab7b99fe147edd8140c3f1b8f
- https://github.com/MathFoundationRL/Book-Mathematical-Foundation-of-Reinforcement-Learning
- 强化学习的数学原理 —— 值迭代(Value Iteration)算法推导与 Python 从零实现【附源码】
- 强化学习的数学原理 —— 策略迭代(Policy Iteration)算法推导与 Python 从零实现【附源码】
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)