【DexGraspNet与多指手抓取算法详解】第五章 高级抓取算法:扩散模型与强化学习
目录
脚本二:条件扩散与Classifier-Free Guidance
第一部分 原理详解
5.1 去噪扩散概率模型在抓取中的应用
扩散模型通过逐步去噪的过程学习复杂数据分布,在抓取姿态生成任务中展现出对多模态分布的建模能力。与 VAE 和 GAN 相比,扩散模型提供稳定的训练过程和高质量的样本生成。
5.1.1 扩散模型基本原理
扩散模型包含两个过程:前向扩散过程逐步向数据添加噪声,反向去噪过程学习从噪声中恢复数据。
5.1.1.1 前向扩散过程
前向过程定义为马尔可夫链,在每个时间步 $t$ 向样本 $x_0$ 添加高斯噪声:
$$q(x_t \mid x_{t-1}) = \mathcal{N}(x_t; \sqrt{1 - \beta_t} x_{t-1}, \beta_t I)$$
通过重参数化,可直接从 $x_0$ 采样任意时刻 $t$ 的加噪样本:
$$x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon, \quad \epsilon \sim \mathcal{N}(0, I)$$
其中 $\bar{\alpha}_t = \prod_{i=1}^t (1 - \beta_i)$,$\beta_t$ 为预设的噪声调度系数。
高斯噪声添加公式
噪声添加的闭式解允许高效训练。方差调度策略 $\beta_t$ 通常采用线性或余弦调度:
$$\beta_t = \text{linspace}(\beta_{start}, \beta_{end}, T)$$
或
$$\beta_t = 1 - \frac{\cos^2\left(\frac{t/T+s}{1+s} \cdot \frac{\pi}{2}\right)}{\cos^2\left(\frac{s}{1+s} \cdot \frac{\pi}{2}\right)}$$
其中 $s=0.008$ 为余弦调度的小偏移量,防止 $\beta_0$ 过小。
时间步调度策略
时间步 $t$ 的编码对网络学习至关重要。采用正弦位置编码将标量 $t$ 映射到高维向量:
$$PE(t)_{2i} = \sin\left(\frac{t}{10000^{2i/d}}\right), \quad PE(t)_{2i+1} = \cos\left(\frac{t}{10000^{2i/d}}\right)$$
该编码使网络能够区分不同噪声水平,学习时间相关的去噪策略。
5.1.1.2 反向去噪过程
反向过程通过神经网络 $\epsilon_\theta(x_t, t)$ 预测噪声,逐步恢复干净样本:
$$p_\theta(x_{t-1} \mid x_t) = \mathcal{N}(x_{t-1}; \mu_\theta(x_t, t), \Sigma_\theta(x_t, t))$$
均值估计为:
$$\mu_\theta(x_t, t) = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{1 - \alpha_t}{\sqrt{1 - \bar{\alpha}_t}} \epsilon_\theta(x_t, t) \right)$$
噪声预测网络结构
噪声预测网络通常采用 U-Net 架构,包含编码器、瓶颈层和解码器。时间步编码通过自适应归一化 (AdaGN) 注入各层:
$$\text{AdaGN}(h, PE(t)) = \text{GroupNorm}(h) \cdot (1 + \gamma(t)) + \beta(t)$$
对于抓取生成任务,输入为加噪的抓取姿态 $x_t \in \mathbb{R}^{n_{joints} + 6}$,输出为预测的噪声 $\epsilon_\theta$。
条件引导生成
在推理阶段,通过分类器引导 (Classifier Guidance) 或无分类器引导 (Classifier-Free Guidance, CFG) 增强条件控制。CFG 在训练时以概率 $p_{uncond}$ 丢弃条件,推理时结合有条件与无条件预测:
$$\tilde{\epsilon}_\theta(x_t, t, c) = \epsilon_\theta(x_t, t, \emptyset) + s \cdot (\epsilon_\theta(x_t, t, c) - \epsilon_\theta(x_t, t, \emptyset))$$
其中 $s \geq 1$ 为引导强度,控制生成样本与条件的对齐程度。
5.1.2 条件扩散抓取生成
将点云作为条件输入,扩散模型学习在特定物体几何约束下的抓取分布。
5.1.2.1 点云作为条件输入
点云编码器 (如 PointNet++) 提取几何特征 $f_{pc}$,通过交叉注意力机制与扩散网络融合:
$$\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V$$
其中 $Q$ 来自去噪网络中间层,$K, V$ 来自点云编码器。
点云编码器与扩散网络融合
融合策略采用基于 Transformer 的架构。点云特征作为 Key 和 Value,去噪网络特征作为 Query:
$$h_{fused} = h_{denoise} + \text{CrossAttn}(h_{denoise}, f_{pc}, f_{pc})$$
该机制使去噪过程能够查询物体几何信息,生成物理可行的抓取姿态。
Classifier-Free Guidance 技术
CFG 在抓取生成中平衡多样性与质量。高引导强度 ($s=5-10$) 产生与物体紧密贴合的抓取,低强度保留更多多样性。训练时随机丢弃点云条件(概率通常 $0.1-0.2$),使网络学习无条件生成能力。
5.1.2.2 多模态抓取姿态生成
同一物体存在多种可行抓取方式(如顶部抓取、侧面抓取)。扩散模型通过随机性自然建模多模态分布。
解决多模态分布问题
传统回归方法受限于均值回归问题,而扩散模型通过随机采样捕获分布中的多个模态。在潜在空间 $x_T \sim \mathcal{N}(0, I)$ 中,不同随机种子对应不同抓取模式。
生成样本的多样性评估
多样性度量包括平均最近邻距离 (Average Nearest Neighbor Distance) 与覆盖度 (Coverage):
$$\text{Diversity} = \frac{1}{N} \sum_{i=1}^N \min_{j \neq i} \|g_i - g_j\|_2$$
高多样性表明生成样本均匀分布在可行抓取流形上。
5.2 深度强化学习
深度强化学习 (DRL) 通过与仿真环境交互,学习端到端的抓取控制策略,适应动态场景与不确定性。
5.2.1 马尔可夫决策过程 (MDP) 定义
抓取任务建模为 MDP $(\mathcal{S}, \mathcal{A}, \mathcal{P}, \mathcal{R}, \gamma)$,其中智能体通过策略 $\pi(a \mid s)$ 最大化累积奖励。
5.2.1.1 状态空间
状态 $s_t$ 包含环境与机器人完整描述。
本体感知
本体感知状态 $s_{proprio}$ 包含关节位置 $q$、速度 $\dot{q}$、力矩 $\tau$ 及末端执行器位姿 $T_{ee}$:
$$s_{proprio} = [q^T, \dot{q}^T, \tau^T, \text{flatten}(T_{ee})^T]^T$$
视觉观测编码
视觉状态通过编码器 $\phi_{vis}$ 压缩为低维特征:
$$s_{visual} = \phi_{vis}(P_t, I_t)$$
其中 $P_t$ 为点云观测,$I_t$ 为图像观测。多模态融合采用早期融合或中期融合策略。
5.2.1.2 动作空间
动作 $a_t$ 定义控制指令,可选择关节空间或任务空间。
关节位置控制
关节位置控制输出目标关节角 $q_{target}$,由底层 PID 控制器跟踪:
$$a_t = q_{target} \in \mathbb{R}^{n_{joints}}$$
该表示直接但可能导致末端执行器运动不直观。
末端执行器位姿增量控制
任务空间控制输出位姿增量 $\Delta x$:
$$a_t = [\Delta t^T, \Delta \theta^T, \Delta \phi]^T$$
其中 $\Delta t \in \mathbb{R}^3$ 为平移增量,$\Delta \theta$ 为旋转轴角,$\Delta \phi$ 为手指开合指令。通过逆运动学将任务空间指令映射到关节空间。
5.2.2 策略学习算法
近端策略优化 (PPO) 是当前抓取任务的主流算法,平衡样本效率与训练稳定性。
5.2.2.1 近端策略优化 (PPO)
PPO 通过裁剪目标函数限制策略更新幅度,防止破坏性大更新。
截断目标函数
替代目标函数为:
$$L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right) \right]$$
其中 $r_t(\theta) = \frac{\pi_\theta(a_t \mid s_t)}{\pi_{\theta_{old}}(a_t \mid s_t)}$ 为重要性采样比率,$\epsilon=0.2$ 为裁剪超参数。
优势函数估计 (GAE)
广义优势估计 (GAE) 计算优势函数 $\hat{A}_t$:
$$\hat{A}_t = \sum_{l=0}^\infty (\gamma\lambda)^l \delta_{t+l}^V$$
其中 $\delta_t^V = r_t + \gamma V(s_{t+1}) - V(s_t)$ 为时序差分残差,$\lambda=0.95$ 控制偏差-方差权衡。
5.2.2.2 奖励函数设计
奖励函数引导策略学习成功抓取行为。
抓取成功奖励
稀疏奖励在成功抓取时提供:
$$r_{success} = \begin{cases}
+10 & \text{if grasp stable for } T_{hold} \
0 & \text{otherwise}
\end{cases}$$
动作平滑惩罚与穿透惩罚
密集塑形奖励包括:
$$r_{shaping} = -w_1 \|a_t - a_{t-1}\|_2^2 - w_2 \max(0, d_{penetration}) - w_3 \|f_{contact}\|_2^2$$
其中第一项惩罚动作抖动,第二项惩罚手指-物体穿透,第三项鼓励适度接触力。
5.2.3 仿真环境下的高效采样
大规模并行仿真加速策略训练。
5.2.3.1 Isaac Gym 并行加速
Isaac Gym 支持在 GPU 上并行运行数千个仿真环境。状态张量形状为 $[N_{envs}, N_{states}]$,动作张量为 $[N_{envs}, N_{actions}]$,实现 SIMD (Single Instruction Multiple Data) 并行:
$$S_{t+1} = \text{step}(S_t, A_t)$$
其中 $S_t \in \mathbb{R}^{N_{envs} \times N_{states}}$,支持每秒数百万次环境步进。
5.2.3.2 域随机化策略
域随机化 (Domain Randomization, DR) 在训练时随机化物理参数与视觉属性,增强策略泛化性:
$$\zeta \sim U(\zeta_{min}, \zeta_{max})$$
其中 $\zeta$ 包括摩擦系数、质量、质心位置、光照条件、相机位姿等。通过在实际部署时暴露于训练分布的"平均"环境,策略获得鲁棒性。
第二部分 代码实现
脚本一:扩散模型前向与反向过程
Python
"""
Script: diffusion_model_grasp.py
Content: 5.1.1 扩散模型基本原理与过程实现
Usage: 直接运行 python diffusion_model_grasp.py
功能: 实现DDPM前向加噪与反向去噪过程,可视化时间步调度与噪声水平
"""
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
class DiffusionGrasp:
"""扩散模型抓取生成"""
def __init__(self, T=1000, beta_start=1e-4, beta_end=0.02, schedule='linear'):
self.T = T
# 噪声调度
if schedule == 'linear':
self.betas = np.linspace(beta_start, beta_end, T)
elif schedule == 'cosine':
s = 0.008
steps = np.arange(T + 1)
x = np.cos(((steps / T + s) / (1 + s)) * np.pi * 0.5) ** 2
alphas_cumprod = x[1:] / x[:-1]
self.betas = np.clip(1 - alphas_cumprod, 0.0001, 0.9999)
self.alphas = 1.0 - self.betas
self.alphas_cumprod = np.cumprod(self.alphas)
self.alphas_cumprod_prev = np.concatenate([[1.0], self.alphas_cumprod[:-1]])
# 计算扩散过程参数
self.sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod)
self.sqrt_one_minus_alphas_cumprod = np.sqrt(1.0 - self.alphas_cumprod)
self.sqrt_recip_alphas = np.sqrt(1.0 / self.alphas)
# 后验方差
self.posterior_variance = self.betas * (1.0 - self.alphas_cumprod_prev) / (1.0 - self.alphas_cumprod)
def q_sample(self, x_0, t, noise=None):
"""
前向扩散: 从x_0采样x_t
x_0: [batch, dim]
t: [batch]
"""
if noise is None:
noise = np.random.randn(*x_0.shape)
sqrt_alphas_cumprod_t = self.sqrt_alphas_cumprod[t].reshape(-1, 1)
sqrt_one_minus_alphas_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].reshape(-1, 1)
return sqrt_alphas_cumprod_t * x_0 + sqrt_one_minus_alphas_cumprod_t * noise
def predict_noise(self, x_t, t, net_params=None):
"""
噪声预测网络 (简化模拟)
实际应为神经网络,这里用启发式近似
"""
# 模拟: 噪声预测误差随训练减小
if net_params is None:
net_params = {'error_scale': 0.3}
true_noise = np.random.randn(*x_t.shape) * 0.1 # 模拟
predicted_noise = true_noise + np.random.randn(*x_t.shape) * net_params['error_scale']
return predicted_noise
def p_sample(self, x_t, t, condition=None):
"""
反向去噪采样: 从x_t采样x_{t-1}
"""
betas_t = self.betas[t]
sqrt_one_minus_alphas_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t]
sqrt_recip_alphas_t = self.sqrt_recip_alphas[t]
# 预测噪声
predicted_noise = self.predict_noise(x_t, np.array([t]))
# 计算均值
model_mean = sqrt_recip_alphas_t * (x_t - betas_t * predicted_noise / sqrt_one_minus_alphas_cumprod_t)
if t == 0:
return model_mean
else:
posterior_variance_t = self.posterior_variance[t]
noise = np.random.randn(*x_t.shape)
return model_mean + np.sqrt(posterior_variance_t) * noise
def p_sample_loop(self, shape, condition=None):
"""
完整生成循环
"""
batch_size = shape[0]
x = np.random.randn(*shape)
history = [x.copy()]
for t in reversed(range(self.T)):
x = self.p_sample(x, t, condition)
if t % (self.T // 10) == 0:
history.append(x.copy())
return x, history
def get_sinusoidal_timestep_embedding(self, timesteps, embedding_dim=128):
"""
正弦时间步编码
"""
half_dim = embedding_dim // 2
emb = np.log(10000) / (half_dim - 1)
emb = np.exp(np.arange(half_dim) * -emb)
emb = timesteps[:, None] * emb[None, :]
emb = np.concatenate([np.sin(emb), np.cos(emb)], axis=1)
return emb
def visualize_diffusion():
"""可视化扩散过程"""
fig = plt.figure(figsize=(16, 12))
diffusion = DiffusionGrasp(T=1000, schedule='cosine')
# 模拟抓取姿态 (12维: 6D旋转 + 3D位置 + 3关节)
x_0 = np.random.randn(1, 12) * 0.5 + np.array([1, 0, 0, 0, 1, 0, 0.1, 0.1, 0.1, 0.5, 0.5, 0.5])
# 子图1: 前向扩散过程
ax1 = fig.add_subplot(231)
timesteps = [0, 100, 300, 500, 700, 999]
colors = plt.cm.viridis(np.linspace(0, 1, len(timesteps)))
for t, color in zip(timesteps, colors):
x_t = diffusion.q_sample(x_0, np.array([t]))
# 可视化前2维
ax1.scatter(x_t[0, 0], x_t[0, 1], c=[color], s=100, label=f't={t}')
ax1.plot(x_0[0, 0], x_0[0, 1], 'r*', markersize=15, label='x_0')
ax1.set_xlabel('Dim 1')
ax1.set_ylabel('Dim 2')
ax1.set_title('Forward Diffusion Process', fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 子图2: 噪声调度对比
ax2 = fig.add_subplot(232)
diffusion_linear = DiffusionGrasp(T=1000, schedule='linear')
ax2.plot(diffusion_linear.alphas_cumprod, 'b-', linewidth=2, label='Linear Schedule')
ax2.plot(diffusion.alphas_cumprod, 'r--', linewidth=2, label='Cosine Schedule')
ax2.set_xlabel('Timestep')
ax2.set_ylabel(r'$\bar{\alpha}_t$ (Signal Level)')
ax2.set_title('Noise Schedule Comparison', fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 子图3: 时间步编码可视化
ax3 = fig.add_subplot(233)
ts = np.arange(0, 1000, 10)
emb = diffusion.get_sinusoidal_timestep_embedding(ts, embedding_dim=64)
im = ax3.imshow(emb[:20, :], aspect='auto', cmap='coolwarm')
ax3.set_xlabel('Embedding Dimension')
ax3.set_ylabel('Timestep Index')
ax3.set_title('Sinusoidal Timestep Embedding', fontweight='bold')
plt.colorbar(im, ax=ax3)
# 子图4: 反向去噪轨迹
ax4 = fig.add_subplot(234)
x_gen, history = diffusion.p_sample_loop((1, 12))
history_array = np.array(history)[:, 0, :2] # 取前2维
ax4.plot(history_array[:, 0], history_array[:, 1], 'b-o', linewidth=2, markersize=4, alpha=0.6)
ax4.scatter(history_array[0, 0], history_array[0, 1], c='green', s=100, marker='o', label='Start (Noise)')
ax4.scatter(history_array[-1, 0], history_array[-1, 1], c='red', s=100, marker='*', label='End (Generated)')
ax4.set_xlabel('Dim 1')
ax4.set_ylabel('Dim 2')
ax4.set_title('Reverse Denoising Trajectory', fontweight='bold')
ax4.legend()
ax4.grid(True, alpha=0.3)
# 子图5: 不同时间步的噪声水平分布
ax5 = fig.add_subplot(235)
sample_count = 1000
noise_levels = []
for t in [0, 250, 500, 750, 999]:
x_0_batch = np.tile(x_0, (sample_count, 1))
t_batch = np.full(sample_count, t)
x_t_batch = diffusion.q_sample(x_0_batch, t_batch)
# 计算与x_0的距离分布
distances = np.linalg.norm(x_t_batch - x_0_batch, axis=1)
noise_levels.append(distances.mean())
ax5.bar(range(len(noise_levels)), noise_levels, color='steelblue', alpha=0.7)
ax5.set_xticklabels(['t=0', '250', '500', '750', '999'])
ax5.set_ylabel('Average Distance from x_0')
ax5.set_title('Noise Level by Timestep', fontweight='bold')
ax5.grid(True, alpha=0.3, axis='y')
# 子图6: 生成样本分布
ax6 = fig.add_subplot(236)
n_samples = 200
samples = []
for _ in range(n_samples):
x_gen, _ = diffusion.p_sample_loop((1, 12))
samples.append(x_gen[0])
samples = np.array(samples)
ax6.scatter(samples[:, 0], samples[:, 1], c='blue', s=20, alpha=0.5, label='Generated')
ax6.scatter(x_0[0, 0], x_0[0, 1], c='red', s=200, marker='*', label='Original', zorder=5)
# 绘制置信椭圆
from matplotlib.patches import Ellipse
mean = samples[:, :2].mean(axis=0)
cov = np.cov(samples[:, :2].T)
eigenvalues, eigenvectors = np.linalg.eigh(cov)
angle = np.degrees(np.arctan2(*eigenvectors[:, 0][::-1]))
width, height = 2 * np.sqrt(eigenvalues)
ell = Ellipse(xy=mean, width=width, height=height, angle=angle,
edgecolor='red', facecolor='none', linewidth=2, linestyle='--')
ax6.add_patch(ell)
ax6.set_xlabel('Dim 1')
ax6.set_ylabel('Dim 2')
ax6.set_title(f'Generated Distribution (n={n_samples})', fontweight='bold')
ax6.legend()
ax6.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('diffusion_model_grasp.png', dpi=150, bbox_inches='tight')
plt.show()
print("扩散模型可视化完成")
print(f"时间步数: {diffusion.T}")
print(f"生成样本均值: {samples.mean(axis=0)[:3]}")
print(f"生成样本标准差: {samples.std(axis=0)[:3]}")
if __name__ == "__main__":
visualize_diffusion()
脚本二:条件扩散与Classifier-Free Guidance
Python
"""
Script: conditional_diffusion_cfg.py
Content: 5.1.2 条件扩散抓取生成与CFG
Usage: 直接运行 python conditional_diffusion_cfg.py
功能: 实现点云条件扩散与Classifier-Free Guidance,评估多模态生成多样性
"""
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
class ConditionalDiffusion:
"""条件扩散模型"""
def __init__(self, grasp_dim=12, condition_dim=64, T=1000):
self.grasp_dim = grasp_dim
self.condition_dim = condition_dim
self.T = T
# 简化噪声调度
self.betas = np.linspace(1e-4, 0.02, T)
self.alphas = 1.0 - self.betas
self.alphas_cumprod = np.cumprod(self.alphas)
# 条件编码器参数 (模拟)
self.condition_encoder = np.random.randn(64, condition_dim) * 0.1
def encode_condition(self, point_cloud):
"""编码点云条件"""
# 简化: 点云特征平均池化
pc_features = point_cloud.mean(axis=0) # [64]
condition = pc_features @ self.condition_encoder # [condition_dim]
return condition
def predict_noise(self, x_t, t, condition=None, dropout_prob=0.0):
"""
条件噪声预测
dropout_prob: CFG训练时的条件丢弃概率
"""
if condition is not None and np.random.rand() > dropout_prob:
# 有条件预测
cond_influence = condition[:self.grasp_dim] * 0.3
noise = np.random.randn(*x_t.shape) * 0.5 + cond_influence * 0.1
else:
# 无条件预测
noise = np.random.randn(*x_t.shape) * 0.5
return noise
def sample_cfg(self, shape, condition, guidance_scale=5.0):
"""
Classifier-Free Guidance采样
"""
x = np.random.randn(*shape)
for t in reversed(range(self.T)):
# 无条件预测
noise_uncond = self.predict_noise(x, t, condition=None)
# 有条件预测
noise_cond = self.predict_noise(x, t, condition=condition)
# CFG组合
noise_pred = noise_uncond + guidance_scale * (noise_cond - noise_uncond)
# 去噪步骤 (简化)
alpha_t = self.alphas[t]
alpha_cumprod_t = self.alphas_cumprod[t]
x = (x - (1 - alpha_t) / np.sqrt(1 - alpha_cumprod_t) * noise_pred) / np.sqrt(alpha_t)
if t > 0:
noise = np.random.randn(*x.shape) * np.sqrt(self.betas[t])
x = x + noise
return x
def compute_diversity(self, samples):
"""计算生成样本多样性"""
n = len(samples)
# 平均最近邻距离
distances = []
for i in range(n):
dists = [np.linalg.norm(samples[i] - samples[j]) for j in range(n) if i != j]
distances.append(min(dists))
avg_nn_dist = np.mean(distances)
# 覆盖度 (使用聚类近似)
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=min(5, n), random_state=42)
kmeans.fit(samples)
coverage = len(np.unique(kmeans.labels_)) / min(5, n)
return avg_nn_dist, coverage
def visualize_conditional_diffusion():
"""可视化条件扩散"""
fig = plt.figure(figsize=(16, 12))
model = ConditionalDiffusion(grasp_dim=12, condition_dim=32)
# 模拟点云条件 (不同物体形状)
conditions = {
'Sphere': np.random.randn(100, 64) * 0.5 + 1.0,
'Cube': np.random.randn(100, 64) * 0.5 - 1.0,
'Cylinder': np.random.randn(100, 64) * 0.5 + np.array([2.0, -1.0] + [0]*62)
}
# 子图1: 不同条件下的生成分布
ax1 = fig.add_subplot(231)
colors = {'Sphere': 'blue', 'Cube': 'red', 'Cylinder': 'green'}
all_samples = {}
for obj_name, pc in conditions.items():
cond = model.encode_condition(pc)
samples = []
for _ in range(50):
sample = model.sample_cfg((1, 12), cond, guidance_scale=5.0)
samples.append(sample[0])
samples = np.array(samples)
all_samples[obj_name] = samples
ax1.scatter(samples[:, 0], samples[:, 1], c=colors[obj_name],
s=30, alpha=0.6, label=obj_name)
ax1.set_xlabel('Grasp Dim 1')
ax1.set_ylabel('Grasp Dim 2')
ax1.set_title('Conditional Generation by Object', fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 子图2: CFG引导强度影响
ax2 = fig.add_subplot(232)
guidance_scales = [1.0, 2.0, 5.0, 10.0]
cond = model.encode_condition(conditions['Sphere'])
for scale in guidance_scales:
samples = []
for _ in range(30):
s = model.sample_cfg((1, 12), cond, guidance_scale=scale)
samples.append(s[0])
samples = np.array(samples)
# 计算方差 (集中度)
variance = np.var(samples[:, 0])
ax2.scatter([scale], [variance], s=100, c='blue', alpha=0.7)
ax2.set_xlabel('Guidance Scale')
ax2.set_ylabel('Sample Variance')
ax2.set_title('Effect of CFG Scale', fontweight='bold')
ax2.grid(True, alpha=0.3)
# 子图3: 多样性vs质量权衡
ax3 = fig.add_subplot(233)
diversity_scores = []
quality_scores = []
for scale in np.linspace(1, 10, 10):
samples = []
for _ in range(100):
s = model.sample_cfg((1, 12), cond, guidance_scale=scale)
samples.append(s[0])
samples = np.array(samples)
div, cov = model.compute_diversity(samples)
diversity_scores.append(div)
# 质量用与条件的一致性近似
quality = 10.0 / (scale + 0.1) # 模拟
quality_scores.append(quality)
ax3.plot(guidance_scales, diversity_scores, 'b-o', label='Diversity')
ax3_twin = ax3.twinx()
ax3_twin.plot(guidance_scales, quality_scores, 'r-s', label='Quality')
ax3.set_xlabel('Guidance Scale')
ax3.set_ylabel('Diversity', color='b')
ax3_twin.set_ylabel('Quality Score', color='r')
ax3.set_title('Diversity-Quality Trade-off', fontweight='bold')
ax3.grid(True, alpha=0.3)
# 子图4: 多模态生成 (同一条件,不同随机种子)
ax4 = fig.add_subplot(234)
cond = model.encode_condition(conditions['Cube'])
# 生成大量样本观察模态
many_samples = []
for _ in range(200):
s = model.sample_cfg((1, 12), cond, guidance_scale=3.0)
many_samples.append(s[0, :2]) # 只取前2维
many_samples = np.array(many_samples)
# KDE近似密度
from scipy.stats import gaussian_kde
kde = gaussian_kde(many_samples.T)
# 绘制密度等高线
x_min, x_max = many_samples[:, 0].min(), many_samples[:, 0].max()
y_min, y_max = many_samples[:, 1].min(), many_samples[:, 1].max()
X, Y = np.mgrid[x_min:x_max:50j, y_min:y_max:50j]
positions = np.vstack([X.ravel(), Y.ravel()])
Z = np.reshape(kde(positions).T, X.shape)
ax4.contour(X, Y, Z, levels=5, colors='black', alpha=0.5)
ax4.scatter(many_samples[:, 0], many_samples[:, 1], c='blue', s=10, alpha=0.3)
ax4.set_title('Multi-modal Distribution', fontweight='bold')
ax4.set_xlabel('Grasp Dim 1')
ax4.set_ylabel('Grasp Dim 2')
# 子图5: 条件插值
ax5 = fig.add_subplot(235)
cond1 = model.encode_condition(conditions['Sphere'])
cond2 = model.encode_condition(conditions['Cube'])
alphas = np.linspace(0, 1, 10)
interp_samples = []
for alpha in alphas:
cond_interp = (1 - alpha) * cond1 + alpha * cond2
s = model.sample_cfg((1, 12), cond_interp, guidance_scale=5.0)
interp_samples.append(s[0, 0]) # 取第一维
ax5.plot(alphas, interp_samples, 'o-', linewidth=2, markersize=8, color='purple')
ax5.set_xlabel('Interpolation Alpha')
ax5.set_ylabel('Grasp Feature Value')
ax5.set_title('Condition Interpolation', fontweight='bold')
ax5.grid(True, alpha=0.3)
# 子图6: 网络架构示意图
ax6 = fig.add_subplot(236)
ax6.axis('off')
# 绘制架构图
ax6.text(0.5, 0.9, 'Conditional Diffusion Model', ha='center', fontsize=14, fontweight='bold')
# 点云编码器
ax6.add_patch(plt.Rectangle((0.1, 0.7), 0.2, 0.1, fill=True, color='lightblue'))
ax6.text(0.2, 0.75, 'Point Cloud\nEncoder', ha='center', va='center', fontsize=9)
# 扩散网络
ax6.add_patch(plt.Rectangle((0.4, 0.5), 0.2, 0.3, fill=True, color='lightgreen'))
ax6.text(0.5, 0.65, 'Denoising\nU-Net', ha='center', va='center', fontsize=9)
ax6.text(0.5, 0.55, 'with Cross-Attn', ha='center', va='center', fontsize=8)
# CFG
ax6.add_patch(plt.Rectangle((0.7, 0.6), 0.2, 0.1, fill=True, color='lightyellow'))
ax6.text(0.8, 0.65, 'CFG\nScale', ha='center', va='center', fontsize=9)
# 箭头
ax6.arrow(0.3, 0.75, 0.1, -0.1, head_width=0.03, head_length=0.03, fc='black')
ax6.arrow(0.6, 0.65, 0.1, 0, head_width=0.03, head_length=0.03, fc='black')
ax6.arrow(0.8, 0.6, 0, -0.2, head_width=0.03, head_length=0.03, fc='black')
ax6.text(0.5, 0.3, 'Noisy Grasp -> Clean Grasp', ha='center', fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat'))
ax6.set_xlim(0, 1)
ax6.set_ylim(0, 1)
plt.tight_layout()
plt.savefig('conditional_diffusion_cfg.png', dpi=150, bbox_inches='tight')
plt.show()
print("条件扩散与CFG可视化完成")
for obj, samples in all_samples.items():
div, cov = model.compute_diversity(samples)
print(f"{obj}: 多样性={div:.4f}, 覆盖度={cov:.2f}")
if __name__ == "__main__":
visualize_conditional_diffusion()
脚本三:马尔可夫决策过程定义
Python
"""
Script: mdp_grasp_definition.py
Content: 5.2.1 马尔可夫决策过程 (MDP) 定义
Usage: 直接运行 python mdp_grasp_definition.py
功能: 实现抓取MDP的状态空间、动作空间定义,包含视觉与本体感知编码
"""
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
class GraspingMDP:
"""抓取MDP环境"""
def __init__(self):
# 机器人参数
self.n_joints = 9
self.ee_pose_dim = 7 # 3 pos + 4 quat
# 状态空间维度
self.proprio_dim = self.n_joints * 3 + self.ee_pose_dim # pos, vel, torque + ee_pose
self.visual_dim = 64 # 编码后的视觉特征
# 动作空间模式
self.action_modes = {
'joint_position': self.n_joints,
'ee_pose_delta': 6, # 3 trans + 3 rot
'hybrid': 6 + 1 # ee_delta + gripper
}
self.current_mode = 'ee_pose_delta'
def reset(self):
"""重置环境"""
self.joint_pos = np.zeros(self.n_joints)
self.joint_vel = np.zeros(self.n_joints)
self.joint_torque = np.zeros(self.n_joints)
self.ee_pose = np.array([0.5, 0.0, 0.3, 0, 0, 0, 1]) # x,y,z,qx,qy,qz,qw
# 物体初始位置
self.object_pos = np.array([0.5, 0.0, 0.1])
self.object_quat = np.array([0, 0, 0, 1])
return self.get_state()
def get_state(self):
"""获取完整状态"""
# 本体感知
proprio = np.concatenate([
self.joint_pos,
self.joint_vel,
self.joint_torque,
self.ee_pose
])
# 视觉观测 (模拟点云特征)
visual = self.encode_visual_observation()
return {
'proprioception': proprio,
'visual': visual,
'full_state': np.concatenate([proprio, visual])
}
def encode_visual_observation(self):
"""编码视觉观测"""
# 模拟: 基于物体位置和机器人位置计算视觉特征
relative_pos = self.object_pos - self.ee_pose[:3]
distance = np.linalg.norm(relative_pos)
# 模拟CNN/PointNet编码
features = np.array([
distance,
relative_pos[0],
relative_pos[1],
relative_pos[2],
self.object_quat[0],
self.object_quat[1],
self.object_quat[2],
self.object_quat[3]
])
# 扩展到64维
visual_encoding = np.repeat(features, 8) + np.random.randn(64) * 0.1
return visual_encoding
def step(self, action):
"""执行动作"""
if self.current_mode == 'joint_position':
self._apply_joint_action(action)
elif self.current_mode == 'ee_pose_delta':
self._apply_ee_action(action)
# 计算奖励
reward = self.compute_reward()
# 检查终止
done = self.check_termination()
return self.get_state(), reward, done
def _apply_joint_action(self, action):
"""关节位置控制"""
target_joints = action
# 模拟PD控制
self.joint_vel = (target_joints - self.joint_pos) * 10.0 # 简化
self.joint_pos += self.joint_vel * 0.01 # dt=0.01
# 更新末端执行器 (简化正向运动学)
self.ee_pose[:3] = self._forward_kinematics(self.joint_pos)
def _apply_ee_action(self, action):
"""末端执行器位姿增量控制"""
delta_pos = action[:3] * 0.01 # 缩放
delta_rot = action[3:6] * 0.05
self.ee_pose[:3] += delta_pos
# 简化旋转更新
from scipy.spatial.transform import Rotation
current_rot = Rotation.from_quat(self.ee_pose[3:])
delta_rotation = Rotation.from_euler('xyz', delta_rot)
new_rot = current_rot * delta_rotation
self.ee_pose[3:] = new_rot.as_quat()
# 逆运动学 (简化)
self.joint_pos = self._inverse_kinematics(self.ee_pose[:3])
def _forward_kinematics(self, joints):
"""简化FK"""
# 简化为臂长累加
x = 0.3 + joints[0] * 0.1 + joints[2] * 0.05
y = joints[1] * 0.1
z = 0.2 + joints[0] * 0.05
return np.array([x, y, z])
def _inverse_kinematics(self, pos):
"""简化IK"""
joints = np.zeros(self.n_joints)
joints[0] = (pos[0] - 0.3) / 0.15
joints[1] = pos[1] / 0.1
joints[2] = (pos[2] - 0.2) / 0.05
return joints
def compute_reward(self):
"""计算奖励"""
# 距离奖励
dist = np.linalg.norm(self.ee_pose[:3] - self.object_pos)
reward = -dist
# 成功奖励
if dist < 0.02:
reward += 10.0
return reward
def check_termination(self):
"""检查终止条件"""
dist = np.linalg.norm(self.ee_pose[:3] - self.object_pos)
return dist < 0.02 or self.ee_pose[2] < 0.05
def visualize_mdp():
"""可视化MDP"""
fig = plt.figure(figsize=(16, 12))
mdp = GraspingMDP()
state = mdp.reset()
# 模拟轨迹
trajectory = []
actions = []
rewards = []
for i in range(100):
# 随机动作或朝向物体移动
if i < 50:
action = np.array([0.5, 0.0, -0.5, 0, 0, 0]) # 向物体移动
else:
action = np.random.randn(6) * 0.1
state, reward, done = mdp.step(action)
trajectory.append(state['full_state'][:3]) # 记录末端位置
actions.append(action)
rewards.append(reward)
if done:
break
trajectory = np.array(trajectory)
actions = np.array(actions)
# 子图1: 3D轨迹
ax1 = fig.add_subplot(231, projection='3d')
ax1.plot(trajectory[:, 0], trajectory[:, 1], trajectory[:, 2], 'b-', linewidth=2)
ax1.scatter([trajectory[0, 0]], [trajectory[0, 1]], [trajectory[0, 2]],
c='green', s=100, marker='o', label='Start')
ax1.scatter([trajectory[-1, 0]], [trajectory[-1, 1]], [trajectory[-1, 2]],
c='red', s=100, marker='*', label='End')
ax1.scatter([mdp.object_pos[0]], [mdp.object_pos[1]], [mdp.object_pos[2]],
c='orange', s=200, marker='s', label='Object')
ax1.set_title('End-Effector Trajectory', fontweight='bold')
ax1.legend()
# 子图2: 状态组成
ax2 = fig.add_subplot(232)
state_composition = {
'Joint Pos': mdp.n_joints,
'Joint Vel': mdp.n_joints,
'Joint Torque': mdp.n_joints,
'EE Pose': mdp.ee_pose_dim,
'Visual': mdp.visual_dim
}
colors = plt.cm.Set3(np.linspace(0, 1, len(state_composition)))
ax2.pie(state_composition.values(), labels=state_composition.keys(), autopct='%1.1f%%',
colors=colors, startangle=90)
ax2.set_title('State Space Composition', fontweight='bold')
# 子图3: 动作对比
ax3 = fig.add_subplot(233)
action_types = ['Joint Pos', 'EE Delta', 'Hybrid']
action_dims = [mdp.action_modes['joint_position'],
mdp.action_modes['ee_pose_delta'],
mdp.action_modes['hybrid']]
bars = ax3.bar(action_types, action_dims, color=['blue', 'green', 'orange'], alpha=0.7)
ax3.set_ylabel('Action Dimension')
ax3.set_title('Action Space Comparison', fontweight='bold')
for bar, dim in zip(bars, action_dims):
height = bar.get_height()
ax3.text(bar.get_x() + bar.get_width()/2., height,
f'{dim}', ha='center', va='bottom')
# 子图4: 奖励曲线
ax4 = fig.add_subplot(234)
ax4.plot(rewards, 'b-', linewidth=2)
ax4.axhline(y=10, color='green', linestyle='--', label='Success Reward')
ax4.set_xlabel('Step')
ax4.set_ylabel('Reward')
ax4.set_title('Reward Curve', fontweight='bold')
ax4.legend()
ax4.grid(True, alpha=0.3)
# 子图5: 状态可视化 (PCA简化)
ax5 = fig.add_subplot(235)
# 模拟多个状态
states = []
for _ in range(100):
mdp.reset()
for _ in range(np.random.randint(10, 50)):
mdp.step(np.random.randn(6) * 0.5)
states.append(mdp.get_state()['full_state'])
states = np.array(states)
# PCA到2D
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
states_2d = pca.fit_transform(states)
ax5.scatter(states_2d[:, 0], states_2d[:, 1], c='blue', s=30, alpha=0.5)
ax5.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
ax5.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
ax5.set_title('State Space Visualization (PCA)', fontweight='bold')
ax5.grid(True, alpha=0.3)
# 子图6: 动作-状态映射
ax6 = fig.add_subplot(236)
# 动作幅度分布
action_magnitudes = np.linalg.norm(actions, axis=1)
ax6.hist(action_magnitudes, bins=20, color='steelblue', alpha=0.7, edgecolor='black')
ax6.set_xlabel('Action Magnitude')
ax6.set_ylabel('Frequency')
ax6.set_title('Action Distribution', fontweight='bold')
ax6.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.savefig('mdp_grasp_definition.png', dpi=150, bbox_inches='tight')
plt.show()
print("MDP定义可视化完成")
print(f"状态空间维度: {len(state['full_state'])}")
print(f"动作空间维度: {mdp.action_modes[mdp.current_mode]}")
print(f"轨迹长度: {len(trajectory)}")
if __name__ == "__main__":
visualize_mdp()
脚本四:PPO算法实现
Python
"""
Script: ppo_grasp_training.py
Content: 5.2.2.1 近端策略优化 (PPO) 实现
Usage: 直接运行 python ppo_grasp_training.py
功能: 实现PPO的训练循环、截断目标函数与GAE优势估计,可视化训练曲线
"""
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
class PPOAgent:
"""PPO智能体"""
def __init__(self, state_dim=136, action_dim=6, lr=3e-4, gamma=0.99, lam=0.95, clip_eps=0.2):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.lam = lam
self.clip_eps = clip_eps
# 策略网络参数 (简化线性策略)
self.theta_policy = np.random.randn(state_dim, action_dim) * 0.01
self.theta_value = np.random.randn(state_dim, 1) * 0.01
# 优化器参数
self.lr = lr
# 存储轨迹
self.trajectory = []
def select_action(self, state):
"""选择动作 (高斯策略)"""
mean = state @ self.theta_policy
std = 0.1 # 固定标准差
action = mean + np.random.randn(self.action_dim) * std
# 计算log概率
log_prob = -0.5 * np.sum((action - mean)**2) / (std**2) - 0.5 * self.action_dim * np.log(2 * np.pi * std**2)
return action, log_prob, mean
def estimate_value(self, state):
"""估计状态值"""
return state @ self.theta_value
def store_transition(self, state, action, reward, next_state, done, log_prob):
"""存储转移"""
self.trajectory.append({
'state': state,
'action': action,
'reward': reward,
'next_state': next_state,
'done': done,
'log_prob': log_prob
})
def compute_gae(self):
"""计算广义优势估计"""
advantages = []
gae = 0
for t in reversed(range(len(self.trajectory))):
trans = self.trajectory[t]
state = trans['state']
next_state = trans['next_state']
reward = trans['reward']
done = trans['done']
value = self.estimate_value(state)[0]
next_value = self.estimate_value(next_state)[0] if not done else 0
delta = reward + self.gamma * next_value - value
gae = delta + self.gamma * self.lam * (0 if done else gae)
advantages.insert(0, gae)
returns = [adv + self.estimate_value(trans['state'])[0] for adv, trans in zip(advantages, self.trajectory)]
return np.array(advantages), np.array(returns)
def update(self, epochs=10):
"""更新策略"""
if len(self.trajectory) == 0:
return {}
# 计算优势
advantages, returns = self.compute_gae()
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 提取旧log概率
old_log_probs = np.array([t['log_prob'] for t in self.trajectory])
states = np.array([t['state'] for t in self.trajectory])
actions = np.array([t['action'] for t in self.trajectory])
policy_losses = []
value_losses = []
for _ in range(epochs):
# 重新计算log概率 (简化)
means = states @ self.theta_policy
std = 0.1
new_log_probs = -0.5 * np.sum((actions - means)**2, axis=1) / (std**2)
# 重要性采样比率
ratios = np.exp(new_log_probs - old_log_probs)
# 截断目标
surr1 = ratios * advantages
surr2 = np.clip(ratios, 1 - self.clip_eps, 1 + self.clip_eps) * advantages
policy_loss = -np.mean(np.minimum(surr1, surr2))
# 值函数损失
values = states @ self.theta_value
value_loss = np.mean((returns.reshape(-1, 1) - values)**2)
# 梯度更新 (简化数值梯度)
grad_policy = np.random.randn(*self.theta_policy.shape) * 0.01 - 0.001 * self.theta_policy
grad_value = np.random.randn(*self.theta_value.shape) * 0.01 - 0.001 * self.theta_value
self.theta_policy -= self.lr * grad_policy
self.theta_value -= self.lr * grad_value
policy_losses.append(policy_loss)
value_losses.append(value_loss.mean())
self.trajectory = []
return {
'policy_loss': np.mean(policy_losses),
'value_loss': np.mean(value_losses),
'advantage_mean': advantages.mean()
}
def visualize_ppo():
"""可视化PPO训练"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
agent = PPOAgent(state_dim=136, action_dim=6)
# 模拟训练
n_episodes = 100
episode_rewards = []
policy_losses = []
value_losses = []
entropy_bonuses = []
for episode in range(n_episodes):
# 模拟一个episode
state = np.random.randn(agent.state_dim)
episode_reward = 0
for step in range(50):
action, log_prob, _ = agent.select_action(state)
next_state = np.random.randn(agent.state_dim)
reward = -np.linalg.norm(action) * 0.1 + np.random.randn() * 0.1
if step > 40:
reward += 10 # 成功奖励
done = True
else:
done = False
agent.store_transition(state, action, reward, next_state, done, log_prob)
episode_reward += reward
state = next_state
if done:
break
episode_rewards.append(episode_reward)
# 每4个episode更新一次
if episode % 4 == 0 and len(agent.trajectory) > 0:
metrics = agent.update(epochs=5)
policy_losses.append(metrics['policy_loss'])
value_losses.append(metrics['value_loss'])
entropy_bonuses.append(metrics['advantage_mean'])
# 子图1: 奖励曲线
ax1 = axes[0, 0]
ax1.plot(episode_rewards, 'b-', linewidth=2, alpha=0.7)
ax1.plot(np.convolve(episode_rewards, np.ones(10)/10, mode='valid'), 'r-', linewidth=2, label='Moving Avg')
ax1.set_xlabel('Episode')
ax1.set_ylabel('Total Reward')
ax1.set_title('Training Reward Curve', fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 子图2: 策略损失
ax2 = axes[0, 1]
ax2.plot(policy_losses, 'g-', linewidth=2)
ax2.set_xlabel('Update Step')
ax2.set_ylabel('Policy Loss')
ax2.set_title('Policy Loss (CLIP Objective)', fontweight='bold')
ax2.grid(True, alpha=0.3)
# 子图3: 值函数损失
ax3 = axes[0, 2]
ax3.plot(value_losses, 'orange', linewidth=2)
ax3.set_xlabel('Update Step')
ax3.set_ylabel('Value Loss (MSE)')
ax3.set_title('Value Function Loss', fontweight='bold')
ax3.grid(True, alpha=0.3)
# 子图4: 优势函数分布
ax4 = axes[1, 0]
# 模拟GAE计算
advantages = np.random.randn(1000)
ax4.hist(advantages, bins=30, color='purple', alpha=0.7, edgecolor='black')
ax4.axvline(x=0, color='red', linestyle='--', label='Zero')
ax4.set_xlabel('Advantage Value')
ax4.set_ylabel('Frequency')
ax4.set_title('GAE Advantage Distribution', fontweight='bold')
ax4.legend()
ax4.grid(True, alpha=0.3, axis='y')
# 子图5: 截断比率分布
ax5 = axes[1, 1]
ratios = np.random.lognormal(0, 0.2, 1000)
clipped_ratios = np.clip(ratios, 1 - agent.clip_eps, 1 + agent.clip_eps)
ax5.hist(ratios, bins=30, alpha=0.5, label='Original', color='blue')
ax5.hist(clipped_ratios, bins=30, alpha=0.5, label='Clipped', color='red')
ax5.axvline(x=1-agent.clip_eps, color='green', linestyle='--', label='Clip Bound')
ax5.axvline(x=1+agent.clip_eps, color='green', linestyle='--')
ax5.set_xlabel('Importance Sampling Ratio')
ax5.set_ylabel('Frequency')
ax5.set_title('PPO Clipping Effect', fontweight='bold')
ax5.legend()
ax5.grid(True, alpha=0.3, axis='y')
# 子图6: 策略熵
ax6 = axes[1, 2]
entropies = []
for _ in range(50):
state = np.random.randn(agent.state_dim)
_, _, mean = agent.select_action(state)
# 简熵计算
entropy = np.linalg.norm(mean) * 0.1
entropies.append(entropy)
ax6.plot(entropies, linewidth=2, color='brown')
ax6.set_xlabel('Step')
ax6.set_ylabel('Policy Entropy')
ax6.set_title('Policy Entropy Over Time', fontweight='bold')
ax6.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('ppo_grasp_training.png', dpi=150, bbox_inches='tight')
plt.show()
print("PPO训练可视化完成")
print(f"最终平均奖励: {np.mean(episode_rewards[-10:]):.2f}")
print(f"策略更新次数: {len(policy_losses)}")
if __name__ == "__main__":
visualize_ppo()
脚本五:奖励函数设计与域随机化
Python
"""
Script: reward_domain_randomization.py
Content: 5.2.2.2 奖励函数设计与5.2.3.2 域随机化
Usage: 直接运行 python reward_domain_randomization.py
功能: 实现复合奖励函数与域随机化策略,可视化奖励组成与参数分布
"""
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
class RewardFunction:
"""复合奖励函数"""
def __init__(self):
self.weights = {
'success': 10.0,
'approach': -0.1,
'smoothness': -0.01,
'penetration': -5.0,
'force': -0.001
}
def compute(self, state, action, next_state, info):
"""计算奖励"""
reward = 0
components = {}
# 1. 成功奖励
if info.get('grasp_success', False):
reward += self.weights['success']
components['success'] = self.weights['success']
else:
components['success'] = 0
# 2. 接近奖励 (距离惩罚)
distance = np.linalg.norm(next_state['ee_pos'] - info['object_pos'])
approach_reward = self.weights['approach'] * distance
reward += approach_reward
components['approach'] = approach_reward
# 3. 动作平滑性惩罚
if 'prev_action' in info:
smooth_penalty = self.weights['smoothness'] * np.linalg.norm(action - info['prev_action'])**2
reward += smooth_penalty
components['smoothness'] = smooth_penalty
else:
components['smoothness'] = 0
# 4. 穿透惩罚
penetration = info.get('penetration_depth', 0)
pen_penalty = self.weights['penetration'] * max(0, penetration)
reward += pen_penalty
components['penetration'] = pen_penalty
# 5. 接触力惩罚 (鼓励适度接触)
contact_force = info.get('contact_force', 0)
force_penalty = self.weights['force'] * contact_force**2
reward += force_penalty
components['force'] = force_penalty
return reward, components
class DomainRandomization:
"""域随机化"""
def __init__(self):
self.parameters = {
'mass': (0.5, 2.0),
'friction': (0.3, 1.0),
'gravity': (9.0, 10.0),
'com_offset': (-0.02, 0.02),
'light_intensity': (0.8, 1.2),
'camera_noise': (0.0, 0.01),
'actuator_gain': (0.9, 1.1)
}
self.current_params = {}
def randomize(self):
"""随机化参数"""
self.current_params = {}
for param, (min_val, max_val) in self.parameters.items():
if param == 'com_offset':
# 3D偏移
self.current_params[param] = np.random.uniform(min_val, max_val, 3)
else:
self.current_params[param] = np.random.uniform(min_val, max_val)
return self.current_params
def get_randomized_env(self):
"""获取随机化后的环境参数"""
return self.current_params
def visualize_reward_dr():
"""可视化奖励与域随机化"""
fig = plt.figure(figsize=(16, 12))
reward_fn = RewardFunction()
dr = DomainRandomization()
# 模拟轨迹
n_steps = 100
states = {
'ee_pos': np.random.randn(n_steps, 3) * 0.1 + np.array([0.5, 0, 0.2])
}
actions = np.random.randn(n_steps, 6) * 0.1
info = {
'object_pos': np.array([0.5, 0, 0.1]),
'penetration_depth': np.maximum(0, np.random.randn(n_steps) * 0.005),
'contact_force': np.abs(np.random.randn(n_steps) * 10),
'grasp_success': [False] * 95 + [True] * 5
}
rewards = []
components_history = {k: [] for k in ['success', 'approach', 'smoothness', 'penetration', 'force']}
for i in range(n_steps):
info['prev_action'] = actions[i-1] if i > 0 else actions[i]
state = {k: v[i] for k, v in states.items()}
next_state = {k: v[i] for k, v in states.items()}
r, comp = reward_fn.compute(state, actions[i], next_state,
{**info, 'grasp_success': info['grasp_success'][i]})
rewards.append(r)
for k, v in comp.items():
components_history[k].append(v)
# 子图1: 奖励组成堆叠图
ax1 = fig.add_subplot(231)
bottom = np.zeros(n_steps)
colors = {'success': 'green', 'approach': 'blue', 'smoothness': 'orange',
'penetration': 'red', 'force': 'purple'}
for key in ['approach', 'smoothness', 'force', 'penetration', 'success']:
values = np.array(components_history[key])
ax1.bar(range(n_steps), values, bottom=bottom, label=key, color=colors[key], alpha=0.7)
bottom += values
ax1.set_xlabel('Step')
ax1.set_ylabel('Reward Component')
ax1.set_title('Reward Decomposition', fontweight='bold')
ax1.legend(loc='lower left', fontsize=8)
# 子图2: 累计奖励
ax2 = fig.add_subplot(232)
cumulative = np.cumsum(rewards)
ax2.plot(cumulative, 'b-', linewidth=2)
ax2.set_xlabel('Step')
ax2.set_ylabel('Cumulative Reward')
ax2.set_title('Cumulative Reward Over Episode', fontweight='bold')
ax2.grid(True, alpha=0.3)
# 子图3: 域随机化参数分布
ax3 = fig.add_subplot(233)
n_randomizations = 100
param_samples = {param: [] for param in dr.parameters.keys()}
for _ in range(n_randomizations):
params = dr.randomize()
for param, value in params.items():
if param == 'com_offset':
param_samples[param].append(np.linalg.norm(value))
else:
param_samples[param].append(value)
# 小提琴图数据准备
data_to_plot = [param_samples[p] for p in ['mass', 'friction', 'gravity', 'actuator_gain']]
parts = ax3.violinplot(data_to_plot, positions=range(4), showmeans=True)
for pc, color in zip(parts['bodies'], ['blue', 'green', 'red', 'purple']):
pc.set_facecolor(color)
pc.set_alpha(0.7)
ax3.set_xticks(range(4))
ax3.set_xticklabels(['Mass', 'Friction', 'Gravity', 'Actuator'], rotation=15)
ax3.set_ylabel('Parameter Value')
ax3.set_title('Domain Randomization Distributions', fontweight='bold')
ax3.grid(True, alpha=0.3, axis='y')
# 子图4: 穿透深度 vs 奖励
ax4 = fig.add_subplot(234)
pen_depths = np.array(components_history['penetration']) / reward_fn.weights['penetration']
ax4.scatter(pen_depths, rewards, c='red', alpha=0.5, s=20)
ax4.set_xlabel('Penetration Depth (m)')
ax4.set_ylabel('Total Reward')
ax4.set_title('Penetration vs Reward', fontweight='bold')
ax4.grid(True, alpha=0.3)
# 子图5: 奖励密度估计
ax5 = fig.add_subplot(235)
ax5.hist(rewards, bins=30, color='steelblue', alpha=0.7, edgecolor='black', density=True)
ax5.axvline(x=np.mean(rewards), color='red', linestyle='--', linewidth=2, label=f'Mean: {np.mean(rewards):.2f}')
ax5.set_xlabel('Reward Value')
ax5.set_ylabel('Density')
ax5.set_title('Reward Distribution', fontweight='bold')
ax5.legend()
ax5.grid(True, alpha=0.3, axis='y')
# 子图6: 域随机化效果对比
ax6 = fig.add_subplot(236)
# 模拟有无DR的性能差异
episodes = np.arange(50)
performance_no_dr = 50 * (1 - np.exp(-0.1 * episodes)) + np.random.randn(50) * 5
performance_dr = 50 * (1 - np.exp(-0.05 * episodes)) + np.random.randn(50) * 8 # 初期更差但更鲁棒
ax6.plot(episodes, performance_no_dr, 'b-', linewidth=2, label='No DR', alpha=0.7)
ax6.fill_between(episodes, performance_no_dr - 5, performance_no_dr + 5, alpha=0.2)
ax6.plot(episodes, performance_dr, 'r-', linewidth=2, label='With DR', alpha=0.7)
ax6.fill_between(episodes, performance_dr - 8, performance_dr + 8, alpha=0.2)
ax6.set_xlabel('Episode')
ax6.set_ylabel('Success Rate (%)')
ax6.set_title('Domain Randomization Effect', fontweight='bold')
ax6.legend()
ax6.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('reward_domain_randomization.png', dpi=150, bbox_inches='tight')
plt.show()
print("奖励函数与域随机化可视化完成")
print(f"平均奖励: {np.mean(rewards):.3f}")
print(f"成功步骤数: {sum(info["grasp_success"])}")
if __name__ == "__main__":
visualize_reward_dr()
脚本六:Isaac Gym并行仿真
Python
"""
Script: isaac_gym_parallel.py
Content: 5.2.3.1 Isaac Gym并行加速
Usage: 直接运行 python isaac_gym_parallel.py
功能: 模拟GPU并行环境执行,可视化并行扩展性与吞吐量
"""
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time
class ParallelEnvSimulator:
"""并行环境模拟器 (模拟Isaac Gym)"""
def __init__(self, n_envs=1024, state_dim=136, action_dim=6):
self.n_envs = n_envs
self.state_dim = state_dim
self.action_dim = action_dim
# 批量状态
self.states = np.random.randn(n_envs, state_dim)
self.objects_pos = np.random.randn(n_envs, 3) * 0.1 + 0.5
def reset(self):
"""重置所有环境"""
self.states = np.random.randn(self.n_envs, self.state_dim)
return self.states
def step(self, actions):
"""
并行步进所有环境
actions: [n_envs, action_dim]
"""
# 模拟物理计算
# 更新状态 (简化动力学)
self.states[:, :3] += actions[:, :3] * 0.01 # 位置更新
# 计算奖励 (向量化)
distances = np.linalg.norm(self.states[:, :3] - self.objects_pos, axis=1)
rewards = -distances
# 检查终止 (向量化)
dones = distances < 0.02
return self.states, rewards, dones
def simulate_epoch(self, steps=100):
"""模拟一个训练周期"""
start_time = time.time()
for _ in range(steps):
actions = np.random.randn(self.n_envs, self.action_dim)
_, _, _ = self.step(actions)
elapsed = time.time() - start_time
throughput = (self.n_envs * steps) / elapsed
return elapsed, throughput
def visualize_parallel_sim():
"""可视化并行仿真性能"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 子图1: 并行扩展性
ax1 = axes[0, 0]
env_counts = [1, 8, 32, 128, 512, 1024, 4096]
throughputs = []
times_per_step = []
for n_env in env_counts:
sim = ParallelEnvSimulator(n_envs=n_env)
elapsed, throughput = sim.simulate_epoch(steps=50)
throughputs.append(throughput)
times_per_step.append(elapsed / 50 * 1000) # ms per step
ax1.semilogx(env_counts, throughputs, 'b-o', linewidth=2, markersize=8)
ax1.set_xlabel('Number of Parallel Environments')
ax1.set_ylabel('Throughput (steps/sec)')
ax1.set_title('Parallel Scaling (Simulated)', fontweight='bold')
ax1.grid(True, alpha=0.3, which='both')
# 添加理想线性扩展线
ideal = [throughputs[0] * (n / env_counts[0]) for n in env_counts]
ax1.semilogx(env_counts, ideal, 'r--', linewidth=1, alpha=0.5, label='Ideal Linear')
ax1.legend()
# 子图2: 每步时间
ax2 = axes[0, 1]
ax2.bar(range(len(env_counts)), times_per_step, color='steelblue', alpha=0.7)
ax2.set_xticks(range(len(env_counts)))
ax2.set_xticklabels([str(e) for e in env_counts], rotation=45)
ax2.set_ylabel('Time per Step (ms)')
ax2.set_title('Step Time vs Parallelism', fontweight='bold')
ax2.grid(True, alpha=0.3, axis='y')
# 子图3: 内存占用估计
ax3 = axes[1, 0]
# 假设每个环境状态占用内存
bytes_per_env = 136 * 4 + 6 * 4 # float32
memory_mb = [n * bytes_per_env / (1024**2) for n in env_counts]
ax3.semilogy(env_counts, memory_mb, 'g-s', linewidth=2, markersize=8)
ax3.set_xlabel('Number of Environments')
ax3.set_ylabel('Memory Usage (MB)')
ax3.set_title('Memory Scaling', fontweight='bold')
ax3.grid(True, alpha=0.3, which='both')
# 子图4: GPU利用率模拟
ax4 = axes[1, 1]
n_envs_gpu = np.array([128, 256, 512, 1024, 2048, 4096, 8192])
# 模拟GPU利用率随并行度增加
utilization = 100 * (1 - np.exp(-n_envs_gpu / 2000))
ax4.plot(n_envs_gpu, utilization, 'purple', linewidth=3)
ax4.axhline(y=95, color='red', linestyle='--', label='Target Utilization')
ax4.fill_between(n_envs_gpu, utilization, alpha=0.3, color='purple')
ax4.set_xlabel('Parallel Environments')
ax4.set_ylabel('GPU Utilization (%)')
ax4.set_title('GPU Utilization Scaling', fontweight='bold')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('isaac_gym_parallel.png', dpi=150, bbox_inches='tight')
plt.show()
print("Isaac Gym并行仿真可视化完成")
print(f"最大吞吐量: {max(throughputs):.0f} steps/sec")
print(f"最佳并行数: {env_counts[np.argmax(throughputs)]}")
if __name__ == "__main__":
visualize_parallel_sim()
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)