目录

第一部分 原理详解

5.1 去噪扩散概率模型在抓取中的应用

5.1.1 扩散模型基本原理

5.1.1.1 前向扩散过程

5.1.1.2 反向去噪过程

5.1.2 条件扩散抓取生成

5.1.2.1 点云作为条件输入

5.1.2.2 多模态抓取姿态生成

5.2 深度强化学习

5.2.1 马尔可夫决策过程 (MDP) 定义

5.2.1.1 状态空间

5.2.1.2 动作空间

5.2.2 策略学习算法

5.2.2.1 近端策略优化 (PPO)

5.2.2.2 奖励函数设计

5.2.3 仿真环境下的高效采样

5.2.3.1 Isaac Gym 并行加速

5.2.3.2 域随机化策略

第二部分 代码实现

脚本一:扩散模型前向与反向过程

脚本二:条件扩散与Classifier-Free Guidance

脚本三:马尔可夫决策过程定义

脚本四:PPO算法实现

脚本五:奖励函数设计与域随机化

脚本六:Isaac Gym并行仿真



第一部分 原理详解

5.1 去噪扩散概率模型在抓取中的应用

扩散模型通过逐步去噪的过程学习复杂数据分布,在抓取姿态生成任务中展现出对多模态分布的建模能力。与 VAE 和 GAN 相比,扩散模型提供稳定的训练过程和高质量的样本生成。

5.1.1 扩散模型基本原理

扩散模型包含两个过程:前向扩散过程逐步向数据添加噪声,反向去噪过程学习从噪声中恢复数据。

5.1.1.1 前向扩散过程

前向过程定义为马尔可夫链,在每个时间步 $t$ 向样本 $x_0$ 添加高斯噪声:

$$q(x_t \mid x_{t-1}) = \mathcal{N}(x_t; \sqrt{1 - \beta_t} x_{t-1}, \beta_t I)$$

通过重参数化,可直接从 $x_0$ 采样任意时刻 $t$ 的加噪样本:

$$x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon, \quad \epsilon \sim \mathcal{N}(0, I)$$

其中 $\bar{\alpha}_t = \prod_{i=1}^t (1 - \beta_i)$,$\beta_t$ 为预设的噪声调度系数。

高斯噪声添加公式

噪声添加的闭式解允许高效训练。方差调度策略 $\beta_t$ 通常采用线性或余弦调度:

$$\beta_t = \text{linspace}(\beta_{start}, \beta_{end}, T)$$

$$\beta_t = 1 - \frac{\cos^2\left(\frac{t/T+s}{1+s} \cdot \frac{\pi}{2}\right)}{\cos^2\left(\frac{s}{1+s} \cdot \frac{\pi}{2}\right)}$$

其中 $s=0.008$ 为余弦调度的小偏移量,防止 $\beta_0$ 过小。

时间步调度策略

时间步 $t$ 的编码对网络学习至关重要。采用正弦位置编码将标量 $t$ 映射到高维向量:

$$PE(t)_{2i} = \sin\left(\frac{t}{10000^{2i/d}}\right), \quad PE(t)_{2i+1} = \cos\left(\frac{t}{10000^{2i/d}}\right)$$

该编码使网络能够区分不同噪声水平,学习时间相关的去噪策略。

5.1.1.2 反向去噪过程

反向过程通过神经网络 $\epsilon_\theta(x_t, t)$ 预测噪声,逐步恢复干净样本:

$$p_\theta(x_{t-1} \mid x_t) = \mathcal{N}(x_{t-1}; \mu_\theta(x_t, t), \Sigma_\theta(x_t, t))$$

均值估计为:

$$\mu_\theta(x_t, t) = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{1 - \alpha_t}{\sqrt{1 - \bar{\alpha}_t}} \epsilon_\theta(x_t, t) \right)$$

噪声预测网络结构

噪声预测网络通常采用 U-Net 架构,包含编码器、瓶颈层和解码器。时间步编码通过自适应归一化 (AdaGN) 注入各层:

$$\text{AdaGN}(h, PE(t)) = \text{GroupNorm}(h) \cdot (1 + \gamma(t)) + \beta(t)$$

对于抓取生成任务,输入为加噪的抓取姿态 $x_t \in \mathbb{R}^{n_{joints} + 6}$,输出为预测的噪声 $\epsilon_\theta$。

条件引导生成

在推理阶段,通过分类器引导 (Classifier Guidance) 或无分类器引导 (Classifier-Free Guidance, CFG) 增强条件控制。CFG 在训练时以概率 $p_{uncond}$ 丢弃条件,推理时结合有条件与无条件预测:

$$\tilde{\epsilon}_\theta(x_t, t, c) = \epsilon_\theta(x_t, t, \emptyset) + s \cdot (\epsilon_\theta(x_t, t, c) - \epsilon_\theta(x_t, t, \emptyset))$$

其中 $s \geq 1$ 为引导强度,控制生成样本与条件的对齐程度。

5.1.2 条件扩散抓取生成

将点云作为条件输入,扩散模型学习在特定物体几何约束下的抓取分布。

5.1.2.1 点云作为条件输入

点云编码器 (如 PointNet++) 提取几何特征 $f_{pc}$,通过交叉注意力机制与扩散网络融合:

$$\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V$$

其中 $Q$ 来自去噪网络中间层,$K, V$ 来自点云编码器。

点云编码器与扩散网络融合

融合策略采用基于 Transformer 的架构。点云特征作为 Key 和 Value,去噪网络特征作为 Query:

$$h_{fused} = h_{denoise} + \text{CrossAttn}(h_{denoise}, f_{pc}, f_{pc})$$

该机制使去噪过程能够查询物体几何信息,生成物理可行的抓取姿态。

Classifier-Free Guidance 技术

CFG 在抓取生成中平衡多样性与质量。高引导强度 ($s=5-10$) 产生与物体紧密贴合的抓取,低强度保留更多多样性。训练时随机丢弃点云条件(概率通常 $0.1-0.2$),使网络学习无条件生成能力。

5.1.2.2 多模态抓取姿态生成

同一物体存在多种可行抓取方式(如顶部抓取、侧面抓取)。扩散模型通过随机性自然建模多模态分布。

解决多模态分布问题

传统回归方法受限于均值回归问题,而扩散模型通过随机采样捕获分布中的多个模态。在潜在空间 $x_T \sim \mathcal{N}(0, I)$ 中,不同随机种子对应不同抓取模式。

生成样本的多样性评估

多样性度量包括平均最近邻距离 (Average Nearest Neighbor Distance) 与覆盖度 (Coverage):

$$\text{Diversity} = \frac{1}{N} \sum_{i=1}^N \min_{j \neq i} \|g_i - g_j\|_2$$

高多样性表明生成样本均匀分布在可行抓取流形上。


5.2 深度强化学习

深度强化学习 (DRL) 通过与仿真环境交互,学习端到端的抓取控制策略,适应动态场景与不确定性。

5.2.1 马尔可夫决策过程 (MDP) 定义

抓取任务建模为 MDP $(\mathcal{S}, \mathcal{A}, \mathcal{P}, \mathcal{R}, \gamma)$,其中智能体通过策略 $\pi(a \mid s)$ 最大化累积奖励。

5.2.1.1 状态空间

状态 $s_t$ 包含环境与机器人完整描述。

本体感知

本体感知状态 $s_{proprio}$ 包含关节位置 $q$、速度 $\dot{q}$、力矩 $\tau$ 及末端执行器位姿 $T_{ee}$:

$$s_{proprio} = [q^T, \dot{q}^T, \tau^T, \text{flatten}(T_{ee})^T]^T$$

视觉观测编码

视觉状态通过编码器 $\phi_{vis}$ 压缩为低维特征:

$$s_{visual} = \phi_{vis}(P_t, I_t)$$

其中 $P_t$ 为点云观测,$I_t$ 为图像观测。多模态融合采用早期融合或中期融合策略。

5.2.1.2 动作空间

动作 $a_t$ 定义控制指令,可选择关节空间或任务空间。

关节位置控制

关节位置控制输出目标关节角 $q_{target}$,由底层 PID 控制器跟踪:

$$a_t = q_{target} \in \mathbb{R}^{n_{joints}}$$

该表示直接但可能导致末端执行器运动不直观。

末端执行器位姿增量控制

任务空间控制输出位姿增量 $\Delta x$:

$$a_t = [\Delta t^T, \Delta \theta^T, \Delta \phi]^T$$

其中 $\Delta t \in \mathbb{R}^3$ 为平移增量,$\Delta \theta$ 为旋转轴角,$\Delta \phi$ 为手指开合指令。通过逆运动学将任务空间指令映射到关节空间。

5.2.2 策略学习算法

近端策略优化 (PPO) 是当前抓取任务的主流算法,平衡样本效率与训练稳定性。

5.2.2.1 近端策略优化 (PPO)

PPO 通过裁剪目标函数限制策略更新幅度,防止破坏性大更新。

截断目标函数

替代目标函数为:

$$L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right) \right]$$

其中 $r_t(\theta) = \frac{\pi_\theta(a_t \mid s_t)}{\pi_{\theta_{old}}(a_t \mid s_t)}$ 为重要性采样比率,$\epsilon=0.2$ 为裁剪超参数。

优势函数估计 (GAE)

广义优势估计 (GAE) 计算优势函数 $\hat{A}_t$:

$$\hat{A}_t = \sum_{l=0}^\infty (\gamma\lambda)^l \delta_{t+l}^V$$

其中 $\delta_t^V = r_t + \gamma V(s_{t+1}) - V(s_t)$ 为时序差分残差,$\lambda=0.95$ 控制偏差-方差权衡。

5.2.2.2 奖励函数设计

奖励函数引导策略学习成功抓取行为。

抓取成功奖励

稀疏奖励在成功抓取时提供:

$$r_{success} = \begin{cases}

+10 & \text{if grasp stable for } T_{hold} \

0 & \text{otherwise}

\end{cases}$$

动作平滑惩罚与穿透惩罚

密集塑形奖励包括:

$$r_{shaping} = -w_1 \|a_t - a_{t-1}\|_2^2 - w_2 \max(0, d_{penetration}) - w_3 \|f_{contact}\|_2^2$$

其中第一项惩罚动作抖动,第二项惩罚手指-物体穿透,第三项鼓励适度接触力。

5.2.3 仿真环境下的高效采样

大规模并行仿真加速策略训练。

5.2.3.1 Isaac Gym 并行加速

Isaac Gym 支持在 GPU 上并行运行数千个仿真环境。状态张量形状为 $[N_{envs}, N_{states}]$,动作张量为 $[N_{envs}, N_{actions}]$,实现 SIMD (Single Instruction Multiple Data) 并行:

$$S_{t+1} = \text{step}(S_t, A_t)$$

其中 $S_t \in \mathbb{R}^{N_{envs} \times N_{states}}$,支持每秒数百万次环境步进。

5.2.3.2 域随机化策略

域随机化 (Domain Randomization, DR) 在训练时随机化物理参数与视觉属性,增强策略泛化性:

$$\zeta \sim U(\zeta_{min}, \zeta_{max})$$

其中 $\zeta$ 包括摩擦系数、质量、质心位置、光照条件、相机位姿等。通过在实际部署时暴露于训练分布的"平均"环境,策略获得鲁棒性。

第二部分 代码实现

脚本一:扩散模型前向与反向过程

Python

"""
Script: diffusion_model_grasp.py
Content: 5.1.1 扩散模型基本原理与过程实现
Usage: 直接运行 python diffusion_model_grasp.py
功能: 实现DDPM前向加噪与反向去噪过程,可视化时间步调度与噪声水平
"""

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

class DiffusionGrasp:
    """扩散模型抓取生成"""
    
    def __init__(self, T=1000, beta_start=1e-4, beta_end=0.02, schedule='linear'):
        self.T = T
        
        # 噪声调度
        if schedule == 'linear':
            self.betas = np.linspace(beta_start, beta_end, T)
        elif schedule == 'cosine':
            s = 0.008
            steps = np.arange(T + 1)
            x = np.cos(((steps / T + s) / (1 + s)) * np.pi * 0.5) ** 2
            alphas_cumprod = x[1:] / x[:-1]
            self.betas = np.clip(1 - alphas_cumprod, 0.0001, 0.9999)
        
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = np.cumprod(self.alphas)
        self.alphas_cumprod_prev = np.concatenate([[1.0], self.alphas_cumprod[:-1]])
        
        # 计算扩散过程参数
        self.sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = np.sqrt(1.0 - self.alphas_cumprod)
        self.sqrt_recip_alphas = np.sqrt(1.0 / self.alphas)
        
        # 后验方差
        self.posterior_variance = self.betas * (1.0 - self.alphas_cumprod_prev) / (1.0 - self.alphas_cumprod)
    
    def q_sample(self, x_0, t, noise=None):
        """
        前向扩散: 从x_0采样x_t
        x_0: [batch, dim]
        t: [batch]
        """
        if noise is None:
            noise = np.random.randn(*x_0.shape)
        
        sqrt_alphas_cumprod_t = self.sqrt_alphas_cumprod[t].reshape(-1, 1)
        sqrt_one_minus_alphas_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].reshape(-1, 1)
        
        return sqrt_alphas_cumprod_t * x_0 + sqrt_one_minus_alphas_cumprod_t * noise
    
    def predict_noise(self, x_t, t, net_params=None):
        """
        噪声预测网络 (简化模拟)
        实际应为神经网络,这里用启发式近似
        """
        # 模拟: 噪声预测误差随训练减小
        if net_params is None:
            net_params = {'error_scale': 0.3}
        
        true_noise = np.random.randn(*x_t.shape) * 0.1  # 模拟
        predicted_noise = true_noise + np.random.randn(*x_t.shape) * net_params['error_scale']
        return predicted_noise
    
    def p_sample(self, x_t, t, condition=None):
        """
        反向去噪采样: 从x_t采样x_{t-1}
        """
        betas_t = self.betas[t]
        sqrt_one_minus_alphas_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t]
        sqrt_recip_alphas_t = self.sqrt_recip_alphas[t]
        
        # 预测噪声
        predicted_noise = self.predict_noise(x_t, np.array([t]))
        
        # 计算均值
        model_mean = sqrt_recip_alphas_t * (x_t - betas_t * predicted_noise / sqrt_one_minus_alphas_cumprod_t)
        
        if t == 0:
            return model_mean
        else:
            posterior_variance_t = self.posterior_variance[t]
            noise = np.random.randn(*x_t.shape)
            return model_mean + np.sqrt(posterior_variance_t) * noise
    
    def p_sample_loop(self, shape, condition=None):
        """
        完整生成循环
        """
        batch_size = shape[0]
        x = np.random.randn(*shape)
        
        history = [x.copy()]
        
        for t in reversed(range(self.T)):
            x = self.p_sample(x, t, condition)
            if t % (self.T // 10) == 0:
                history.append(x.copy())
        
        return x, history
    
    def get_sinusoidal_timestep_embedding(self, timesteps, embedding_dim=128):
        """
        正弦时间步编码
        """
        half_dim = embedding_dim // 2
        emb = np.log(10000) / (half_dim - 1)
        emb = np.exp(np.arange(half_dim) * -emb)
        emb = timesteps[:, None] * emb[None, :]
        emb = np.concatenate([np.sin(emb), np.cos(emb)], axis=1)
        return emb

def visualize_diffusion():
    """可视化扩散过程"""
    fig = plt.figure(figsize=(16, 12))
    
    diffusion = DiffusionGrasp(T=1000, schedule='cosine')
    
    # 模拟抓取姿态 (12维: 6D旋转 + 3D位置 + 3关节)
    x_0 = np.random.randn(1, 12) * 0.5 + np.array([1, 0, 0, 0, 1, 0, 0.1, 0.1, 0.1, 0.5, 0.5, 0.5])
    
    # 子图1: 前向扩散过程
    ax1 = fig.add_subplot(231)
    timesteps = [0, 100, 300, 500, 700, 999]
    colors = plt.cm.viridis(np.linspace(0, 1, len(timesteps)))
    
    for t, color in zip(timesteps, colors):
        x_t = diffusion.q_sample(x_0, np.array([t]))
        # 可视化前2维
        ax1.scatter(x_t[0, 0], x_t[0, 1], c=[color], s=100, label=f't={t}')
    
    ax1.plot(x_0[0, 0], x_0[0, 1], 'r*', markersize=15, label='x_0')
    ax1.set_xlabel('Dim 1')
    ax1.set_ylabel('Dim 2')
    ax1.set_title('Forward Diffusion Process', fontweight='bold')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 子图2: 噪声调度对比
    ax2 = fig.add_subplot(232)
    diffusion_linear = DiffusionGrasp(T=1000, schedule='linear')
    
    ax2.plot(diffusion_linear.alphas_cumprod, 'b-', linewidth=2, label='Linear Schedule')
    ax2.plot(diffusion.alphas_cumprod, 'r--', linewidth=2, label='Cosine Schedule')
    ax2.set_xlabel('Timestep')
    ax2.set_ylabel(r'$\bar{\alpha}_t$ (Signal Level)')
    ax2.set_title('Noise Schedule Comparison', fontweight='bold')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # 子图3: 时间步编码可视化
    ax3 = fig.add_subplot(233)
    ts = np.arange(0, 1000, 10)
    emb = diffusion.get_sinusoidal_timestep_embedding(ts, embedding_dim=64)
    
    im = ax3.imshow(emb[:20, :], aspect='auto', cmap='coolwarm')
    ax3.set_xlabel('Embedding Dimension')
    ax3.set_ylabel('Timestep Index')
    ax3.set_title('Sinusoidal Timestep Embedding', fontweight='bold')
    plt.colorbar(im, ax=ax3)
    
    # 子图4: 反向去噪轨迹
    ax4 = fig.add_subplot(234)
    x_gen, history = diffusion.p_sample_loop((1, 12))
    
    history_array = np.array(history)[:, 0, :2]  # 取前2维
    ax4.plot(history_array[:, 0], history_array[:, 1], 'b-o', linewidth=2, markersize=4, alpha=0.6)
    ax4.scatter(history_array[0, 0], history_array[0, 1], c='green', s=100, marker='o', label='Start (Noise)')
    ax4.scatter(history_array[-1, 0], history_array[-1, 1], c='red', s=100, marker='*', label='End (Generated)')
    ax4.set_xlabel('Dim 1')
    ax4.set_ylabel('Dim 2')
    ax4.set_title('Reverse Denoising Trajectory', fontweight='bold')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    # 子图5: 不同时间步的噪声水平分布
    ax5 = fig.add_subplot(235)
    sample_count = 1000
    noise_levels = []
    
    for t in [0, 250, 500, 750, 999]:
        x_0_batch = np.tile(x_0, (sample_count, 1))
        t_batch = np.full(sample_count, t)
        x_t_batch = diffusion.q_sample(x_0_batch, t_batch)
        
        # 计算与x_0的距离分布
        distances = np.linalg.norm(x_t_batch - x_0_batch, axis=1)
        noise_levels.append(distances.mean())
    
    ax5.bar(range(len(noise_levels)), noise_levels, color='steelblue', alpha=0.7)
    ax5.set_xticklabels(['t=0', '250', '500', '750', '999'])
    ax5.set_ylabel('Average Distance from x_0')
    ax5.set_title('Noise Level by Timestep', fontweight='bold')
    ax5.grid(True, alpha=0.3, axis='y')
    
    # 子图6: 生成样本分布
    ax6 = fig.add_subplot(236)
    n_samples = 200
    samples = []
    
    for _ in range(n_samples):
        x_gen, _ = diffusion.p_sample_loop((1, 12))
        samples.append(x_gen[0])
    
    samples = np.array(samples)
    ax6.scatter(samples[:, 0], samples[:, 1], c='blue', s=20, alpha=0.5, label='Generated')
    ax6.scatter(x_0[0, 0], x_0[0, 1], c='red', s=200, marker='*', label='Original', zorder=5)
    
    # 绘制置信椭圆
    from matplotlib.patches import Ellipse
    mean = samples[:, :2].mean(axis=0)
    cov = np.cov(samples[:, :2].T)
    eigenvalues, eigenvectors = np.linalg.eigh(cov)
    angle = np.degrees(np.arctan2(*eigenvectors[:, 0][::-1]))
    width, height = 2 * np.sqrt(eigenvalues)
    ell = Ellipse(xy=mean, width=width, height=height, angle=angle, 
                  edgecolor='red', facecolor='none', linewidth=2, linestyle='--')
    ax6.add_patch(ell)
    
    ax6.set_xlabel('Dim 1')
    ax6.set_ylabel('Dim 2')
    ax6.set_title(f'Generated Distribution (n={n_samples})', fontweight='bold')
    ax6.legend()
    ax6.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('diffusion_model_grasp.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("扩散模型可视化完成")
    print(f"时间步数: {diffusion.T}")
    print(f"生成样本均值: {samples.mean(axis=0)[:3]}")
    print(f"生成样本标准差: {samples.std(axis=0)[:3]}")

if __name__ == "__main__":
    visualize_diffusion()

脚本二:条件扩散与Classifier-Free Guidance

Python

"""
Script: conditional_diffusion_cfg.py
Content: 5.1.2 条件扩散抓取生成与CFG
Usage: 直接运行 python conditional_diffusion_cfg.py
功能: 实现点云条件扩散与Classifier-Free Guidance,评估多模态生成多样性
"""

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

class ConditionalDiffusion:
    """条件扩散模型"""
    
    def __init__(self, grasp_dim=12, condition_dim=64, T=1000):
        self.grasp_dim = grasp_dim
        self.condition_dim = condition_dim
        self.T = T
        
        # 简化噪声调度
        self.betas = np.linspace(1e-4, 0.02, T)
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = np.cumprod(self.alphas)
        
        # 条件编码器参数 (模拟)
        self.condition_encoder = np.random.randn(64, condition_dim) * 0.1
        
    def encode_condition(self, point_cloud):
        """编码点云条件"""
        # 简化: 点云特征平均池化
        pc_features = point_cloud.mean(axis=0)  # [64]
        condition = pc_features @ self.condition_encoder  # [condition_dim]
        return condition
    
    def predict_noise(self, x_t, t, condition=None, dropout_prob=0.0):
        """
        条件噪声预测
        dropout_prob: CFG训练时的条件丢弃概率
        """
        if condition is not None and np.random.rand() > dropout_prob:
            # 有条件预测
            cond_influence = condition[:self.grasp_dim] * 0.3
            noise = np.random.randn(*x_t.shape) * 0.5 + cond_influence * 0.1
        else:
            # 无条件预测
            noise = np.random.randn(*x_t.shape) * 0.5
        
        return noise
    
    def sample_cfg(self, shape, condition, guidance_scale=5.0):
        """
        Classifier-Free Guidance采样
        """
        x = np.random.randn(*shape)
        
        for t in reversed(range(self.T)):
            # 无条件预测
            noise_uncond = self.predict_noise(x, t, condition=None)
            
            # 有条件预测
            noise_cond = self.predict_noise(x, t, condition=condition)
            
            # CFG组合
            noise_pred = noise_uncond + guidance_scale * (noise_cond - noise_uncond)
            
            # 去噪步骤 (简化)
            alpha_t = self.alphas[t]
            alpha_cumprod_t = self.alphas_cumprod[t]
            
            x = (x - (1 - alpha_t) / np.sqrt(1 - alpha_cumprod_t) * noise_pred) / np.sqrt(alpha_t)
            
            if t > 0:
                noise = np.random.randn(*x.shape) * np.sqrt(self.betas[t])
                x = x + noise
        
        return x
    
    def compute_diversity(self, samples):
        """计算生成样本多样性"""
        n = len(samples)
        # 平均最近邻距离
        distances = []
        for i in range(n):
            dists = [np.linalg.norm(samples[i] - samples[j]) for j in range(n) if i != j]
            distances.append(min(dists))
        
        avg_nn_dist = np.mean(distances)
        
        # 覆盖度 (使用聚类近似)
        from sklearn.cluster import KMeans
        kmeans = KMeans(n_clusters=min(5, n), random_state=42)
        kmeans.fit(samples)
        coverage = len(np.unique(kmeans.labels_)) / min(5, n)
        
        return avg_nn_dist, coverage

def visualize_conditional_diffusion():
    """可视化条件扩散"""
    fig = plt.figure(figsize=(16, 12))
    
    model = ConditionalDiffusion(grasp_dim=12, condition_dim=32)
    
    # 模拟点云条件 (不同物体形状)
    conditions = {
        'Sphere': np.random.randn(100, 64) * 0.5 + 1.0,
        'Cube': np.random.randn(100, 64) * 0.5 - 1.0,
        'Cylinder': np.random.randn(100, 64) * 0.5 + np.array([2.0, -1.0] + [0]*62)
    }
    
    # 子图1: 不同条件下的生成分布
    ax1 = fig.add_subplot(231)
    colors = {'Sphere': 'blue', 'Cube': 'red', 'Cylinder': 'green'}
    
    all_samples = {}
    for obj_name, pc in conditions.items():
        cond = model.encode_condition(pc)
        samples = []
        for _ in range(50):
            sample = model.sample_cfg((1, 12), cond, guidance_scale=5.0)
            samples.append(sample[0])
        samples = np.array(samples)
        all_samples[obj_name] = samples
        
        ax1.scatter(samples[:, 0], samples[:, 1], c=colors[obj_name], 
                   s=30, alpha=0.6, label=obj_name)
    
    ax1.set_xlabel('Grasp Dim 1')
    ax1.set_ylabel('Grasp Dim 2')
    ax1.set_title('Conditional Generation by Object', fontweight='bold')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 子图2: CFG引导强度影响
    ax2 = fig.add_subplot(232)
    guidance_scales = [1.0, 2.0, 5.0, 10.0]
    cond = model.encode_condition(conditions['Sphere'])
    
    for scale in guidance_scales:
        samples = []
        for _ in range(30):
            s = model.sample_cfg((1, 12), cond, guidance_scale=scale)
            samples.append(s[0])
        samples = np.array(samples)
        
        # 计算方差 (集中度)
        variance = np.var(samples[:, 0])
        ax2.scatter([scale], [variance], s=100, c='blue', alpha=0.7)
    
    ax2.set_xlabel('Guidance Scale')
    ax2.set_ylabel('Sample Variance')
    ax2.set_title('Effect of CFG Scale', fontweight='bold')
    ax2.grid(True, alpha=0.3)
    
    # 子图3: 多样性vs质量权衡
    ax3 = fig.add_subplot(233)
    diversity_scores = []
    quality_scores = []
    
    for scale in np.linspace(1, 10, 10):
        samples = []
        for _ in range(100):
            s = model.sample_cfg((1, 12), cond, guidance_scale=scale)
            samples.append(s[0])
        samples = np.array(samples)
        
        div, cov = model.compute_diversity(samples)
        diversity_scores.append(div)
        # 质量用与条件的一致性近似
        quality = 10.0 / (scale + 0.1)  # 模拟
        quality_scores.append(quality)
    
    ax3.plot(guidance_scales, diversity_scores, 'b-o', label='Diversity')
    ax3_twin = ax3.twinx()
    ax3_twin.plot(guidance_scales, quality_scores, 'r-s', label='Quality')
    ax3.set_xlabel('Guidance Scale')
    ax3.set_ylabel('Diversity', color='b')
    ax3_twin.set_ylabel('Quality Score', color='r')
    ax3.set_title('Diversity-Quality Trade-off', fontweight='bold')
    ax3.grid(True, alpha=0.3)
    
    # 子图4: 多模态生成 (同一条件,不同随机种子)
    ax4 = fig.add_subplot(234)
    cond = model.encode_condition(conditions['Cube'])
    
    # 生成大量样本观察模态
    many_samples = []
    for _ in range(200):
        s = model.sample_cfg((1, 12), cond, guidance_scale=3.0)
        many_samples.append(s[0, :2])  # 只取前2维
    
    many_samples = np.array(many_samples)
    
    # KDE近似密度
    from scipy.stats import gaussian_kde
    kde = gaussian_kde(many_samples.T)
    
    # 绘制密度等高线
    x_min, x_max = many_samples[:, 0].min(), many_samples[:, 0].max()
    y_min, y_max = many_samples[:, 1].min(), many_samples[:, 1].max()
    X, Y = np.mgrid[x_min:x_max:50j, y_min:y_max:50j]
    positions = np.vstack([X.ravel(), Y.ravel()])
    Z = np.reshape(kde(positions).T, X.shape)
    
    ax4.contour(X, Y, Z, levels=5, colors='black', alpha=0.5)
    ax4.scatter(many_samples[:, 0], many_samples[:, 1], c='blue', s=10, alpha=0.3)
    ax4.set_title('Multi-modal Distribution', fontweight='bold')
    ax4.set_xlabel('Grasp Dim 1')
    ax4.set_ylabel('Grasp Dim 2')
    
    # 子图5: 条件插值
    ax5 = fig.add_subplot(235)
    cond1 = model.encode_condition(conditions['Sphere'])
    cond2 = model.encode_condition(conditions['Cube'])
    
    alphas = np.linspace(0, 1, 10)
    interp_samples = []
    
    for alpha in alphas:
        cond_interp = (1 - alpha) * cond1 + alpha * cond2
        s = model.sample_cfg((1, 12), cond_interp, guidance_scale=5.0)
        interp_samples.append(s[0, 0])  # 取第一维
    
    ax5.plot(alphas, interp_samples, 'o-', linewidth=2, markersize=8, color='purple')
    ax5.set_xlabel('Interpolation Alpha')
    ax5.set_ylabel('Grasp Feature Value')
    ax5.set_title('Condition Interpolation', fontweight='bold')
    ax5.grid(True, alpha=0.3)
    
    # 子图6: 网络架构示意图
    ax6 = fig.add_subplot(236)
    ax6.axis('off')
    
    # 绘制架构图
    ax6.text(0.5, 0.9, 'Conditional Diffusion Model', ha='center', fontsize=14, fontweight='bold')
    
    # 点云编码器
    ax6.add_patch(plt.Rectangle((0.1, 0.7), 0.2, 0.1, fill=True, color='lightblue'))
    ax6.text(0.2, 0.75, 'Point Cloud\nEncoder', ha='center', va='center', fontsize=9)
    
    # 扩散网络
    ax6.add_patch(plt.Rectangle((0.4, 0.5), 0.2, 0.3, fill=True, color='lightgreen'))
    ax6.text(0.5, 0.65, 'Denoising\nU-Net', ha='center', va='center', fontsize=9)
    ax6.text(0.5, 0.55, 'with Cross-Attn', ha='center', va='center', fontsize=8)
    
    # CFG
    ax6.add_patch(plt.Rectangle((0.7, 0.6), 0.2, 0.1, fill=True, color='lightyellow'))
    ax6.text(0.8, 0.65, 'CFG\nScale', ha='center', va='center', fontsize=9)
    
    # 箭头
    ax6.arrow(0.3, 0.75, 0.1, -0.1, head_width=0.03, head_length=0.03, fc='black')
    ax6.arrow(0.6, 0.65, 0.1, 0, head_width=0.03, head_length=0.03, fc='black')
    ax6.arrow(0.8, 0.6, 0, -0.2, head_width=0.03, head_length=0.03, fc='black')
    
    ax6.text(0.5, 0.3, 'Noisy Grasp -> Clean Grasp', ha='center', fontsize=10, 
            bbox=dict(boxstyle='round', facecolor='wheat'))
    
    ax6.set_xlim(0, 1)
    ax6.set_ylim(0, 1)
    
    plt.tight_layout()
    plt.savefig('conditional_diffusion_cfg.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("条件扩散与CFG可视化完成")
    for obj, samples in all_samples.items():
        div, cov = model.compute_diversity(samples)
        print(f"{obj}: 多样性={div:.4f}, 覆盖度={cov:.2f}")

if __name__ == "__main__":
    visualize_conditional_diffusion()

脚本三:马尔可夫决策过程定义

Python

"""
Script: mdp_grasp_definition.py
Content: 5.2.1 马尔可夫决策过程 (MDP) 定义
Usage: 直接运行 python mdp_grasp_definition.py
功能: 实现抓取MDP的状态空间、动作空间定义,包含视觉与本体感知编码
"""

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

class GraspingMDP:
    """抓取MDP环境"""
    
    def __init__(self):
        # 机器人参数
        self.n_joints = 9
        self.ee_pose_dim = 7  # 3 pos + 4 quat
        
        # 状态空间维度
        self.proprio_dim = self.n_joints * 3 + self.ee_pose_dim  # pos, vel, torque + ee_pose
        self.visual_dim = 64  # 编码后的视觉特征
        
        # 动作空间模式
        self.action_modes = {
            'joint_position': self.n_joints,
            'ee_pose_delta': 6,  # 3 trans + 3 rot
            'hybrid': 6 + 1  # ee_delta + gripper
        }
        
        self.current_mode = 'ee_pose_delta'
        
    def reset(self):
        """重置环境"""
        self.joint_pos = np.zeros(self.n_joints)
        self.joint_vel = np.zeros(self.n_joints)
        self.joint_torque = np.zeros(self.n_joints)
        self.ee_pose = np.array([0.5, 0.0, 0.3, 0, 0, 0, 1])  # x,y,z,qx,qy,qz,qw
        
        # 物体初始位置
        self.object_pos = np.array([0.5, 0.0, 0.1])
        self.object_quat = np.array([0, 0, 0, 1])
        
        return self.get_state()
    
    def get_state(self):
        """获取完整状态"""
        # 本体感知
        proprio = np.concatenate([
            self.joint_pos,
            self.joint_vel,
            self.joint_torque,
            self.ee_pose
        ])
        
        # 视觉观测 (模拟点云特征)
        visual = self.encode_visual_observation()
        
        return {
            'proprioception': proprio,
            'visual': visual,
            'full_state': np.concatenate([proprio, visual])
        }
    
    def encode_visual_observation(self):
        """编码视觉观测"""
        # 模拟: 基于物体位置和机器人位置计算视觉特征
        relative_pos = self.object_pos - self.ee_pose[:3]
        distance = np.linalg.norm(relative_pos)
        
        # 模拟CNN/PointNet编码
        features = np.array([
            distance,
            relative_pos[0],
            relative_pos[1],
            relative_pos[2],
            self.object_quat[0],
            self.object_quat[1],
            self.object_quat[2],
            self.object_quat[3]
        ])
        
        # 扩展到64维
        visual_encoding = np.repeat(features, 8) + np.random.randn(64) * 0.1
        
        return visual_encoding
    
    def step(self, action):
        """执行动作"""
        if self.current_mode == 'joint_position':
            self._apply_joint_action(action)
        elif self.current_mode == 'ee_pose_delta':
            self._apply_ee_action(action)
        
        # 计算奖励
        reward = self.compute_reward()
        
        # 检查终止
        done = self.check_termination()
        
        return self.get_state(), reward, done
    
    def _apply_joint_action(self, action):
        """关节位置控制"""
        target_joints = action
        # 模拟PD控制
        self.joint_vel = (target_joints - self.joint_pos) * 10.0  # 简化
        self.joint_pos += self.joint_vel * 0.01  # dt=0.01
        
        # 更新末端执行器 (简化正向运动学)
        self.ee_pose[:3] = self._forward_kinematics(self.joint_pos)
        
    def _apply_ee_action(self, action):
        """末端执行器位姿增量控制"""
        delta_pos = action[:3] * 0.01  # 缩放
        delta_rot = action[3:6] * 0.05
        
        self.ee_pose[:3] += delta_pos
        
        # 简化旋转更新
        from scipy.spatial.transform import Rotation
        current_rot = Rotation.from_quat(self.ee_pose[3:])
        delta_rotation = Rotation.from_euler('xyz', delta_rot)
        new_rot = current_rot * delta_rotation
        self.ee_pose[3:] = new_rot.as_quat()
        
        # 逆运动学 (简化)
        self.joint_pos = self._inverse_kinematics(self.ee_pose[:3])
        
    def _forward_kinematics(self, joints):
        """简化FK"""
        # 简化为臂长累加
        x = 0.3 + joints[0] * 0.1 + joints[2] * 0.05
        y = joints[1] * 0.1
        z = 0.2 + joints[0] * 0.05
        return np.array([x, y, z])
    
    def _inverse_kinematics(self, pos):
        """简化IK"""
        joints = np.zeros(self.n_joints)
        joints[0] = (pos[0] - 0.3) / 0.15
        joints[1] = pos[1] / 0.1
        joints[2] = (pos[2] - 0.2) / 0.05
        return joints
    
    def compute_reward(self):
        """计算奖励"""
        # 距离奖励
        dist = np.linalg.norm(self.ee_pose[:3] - self.object_pos)
        reward = -dist
        
        # 成功奖励
        if dist < 0.02:
            reward += 10.0
        
        return reward
    
    def check_termination(self):
        """检查终止条件"""
        dist = np.linalg.norm(self.ee_pose[:3] - self.object_pos)
        return dist < 0.02 or self.ee_pose[2] < 0.05

def visualize_mdp():
    """可视化MDP"""
    fig = plt.figure(figsize=(16, 12))
    
    mdp = GraspingMDP()
    state = mdp.reset()
    
    # 模拟轨迹
    trajectory = []
    actions = []
    rewards = []
    
    for i in range(100):
        # 随机动作或朝向物体移动
        if i < 50:
            action = np.array([0.5, 0.0, -0.5, 0, 0, 0])  # 向物体移动
        else:
            action = np.random.randn(6) * 0.1
        
        state, reward, done = mdp.step(action)
        trajectory.append(state['full_state'][:3])  # 记录末端位置
        actions.append(action)
        rewards.append(reward)
        
        if done:
            break
    
    trajectory = np.array(trajectory)
    actions = np.array(actions)
    
    # 子图1: 3D轨迹
    ax1 = fig.add_subplot(231, projection='3d')
    ax1.plot(trajectory[:, 0], trajectory[:, 1], trajectory[:, 2], 'b-', linewidth=2)
    ax1.scatter([trajectory[0, 0]], [trajectory[0, 1]], [trajectory[0, 2]], 
               c='green', s=100, marker='o', label='Start')
    ax1.scatter([trajectory[-1, 0]], [trajectory[-1, 1]], [trajectory[-1, 2]], 
               c='red', s=100, marker='*', label='End')
    ax1.scatter([mdp.object_pos[0]], [mdp.object_pos[1]], [mdp.object_pos[2]], 
               c='orange', s=200, marker='s', label='Object')
    ax1.set_title('End-Effector Trajectory', fontweight='bold')
    ax1.legend()
    
    # 子图2: 状态组成
    ax2 = fig.add_subplot(232)
    state_composition = {
        'Joint Pos': mdp.n_joints,
        'Joint Vel': mdp.n_joints,
        'Joint Torque': mdp.n_joints,
        'EE Pose': mdp.ee_pose_dim,
        'Visual': mdp.visual_dim
    }
    
    colors = plt.cm.Set3(np.linspace(0, 1, len(state_composition)))
    ax2.pie(state_composition.values(), labels=state_composition.keys(), autopct='%1.1f%%',
           colors=colors, startangle=90)
    ax2.set_title('State Space Composition', fontweight='bold')
    
    # 子图3: 动作对比
    ax3 = fig.add_subplot(233)
    action_types = ['Joint Pos', 'EE Delta', 'Hybrid']
    action_dims = [mdp.action_modes['joint_position'], 
                  mdp.action_modes['ee_pose_delta'],
                  mdp.action_modes['hybrid']]
    
    bars = ax3.bar(action_types, action_dims, color=['blue', 'green', 'orange'], alpha=0.7)
    ax3.set_ylabel('Action Dimension')
    ax3.set_title('Action Space Comparison', fontweight='bold')
    for bar, dim in zip(bars, action_dims):
        height = bar.get_height()
        ax3.text(bar.get_x() + bar.get_width()/2., height,
                f'{dim}', ha='center', va='bottom')
    
    # 子图4: 奖励曲线
    ax4 = fig.add_subplot(234)
    ax4.plot(rewards, 'b-', linewidth=2)
    ax4.axhline(y=10, color='green', linestyle='--', label='Success Reward')
    ax4.set_xlabel('Step')
    ax4.set_ylabel('Reward')
    ax4.set_title('Reward Curve', fontweight='bold')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    # 子图5: 状态可视化 (PCA简化)
    ax5 = fig.add_subplot(235)
    # 模拟多个状态
    states = []
    for _ in range(100):
        mdp.reset()
        for _ in range(np.random.randint(10, 50)):
            mdp.step(np.random.randn(6) * 0.5)
        states.append(mdp.get_state()['full_state'])
    
    states = np.array(states)
    # PCA到2D
    from sklearn.decomposition import PCA
    pca = PCA(n_components=2)
    states_2d = pca.fit_transform(states)
    
    ax5.scatter(states_2d[:, 0], states_2d[:, 1], c='blue', s=30, alpha=0.5)
    ax5.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
    ax5.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
    ax5.set_title('State Space Visualization (PCA)', fontweight='bold')
    ax5.grid(True, alpha=0.3)
    
    # 子图6: 动作-状态映射
    ax6 = fig.add_subplot(236)
    # 动作幅度分布
    action_magnitudes = np.linalg.norm(actions, axis=1)
    ax6.hist(action_magnitudes, bins=20, color='steelblue', alpha=0.7, edgecolor='black')
    ax6.set_xlabel('Action Magnitude')
    ax6.set_ylabel('Frequency')
    ax6.set_title('Action Distribution', fontweight='bold')
    ax6.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.savefig('mdp_grasp_definition.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("MDP定义可视化完成")
    print(f"状态空间维度: {len(state['full_state'])}")
    print(f"动作空间维度: {mdp.action_modes[mdp.current_mode]}")
    print(f"轨迹长度: {len(trajectory)}")

if __name__ == "__main__":
    visualize_mdp()

脚本四:PPO算法实现

Python

"""
Script: ppo_grasp_training.py
Content: 5.2.2.1 近端策略优化 (PPO) 实现
Usage: 直接运行 python ppo_grasp_training.py
功能: 实现PPO的训练循环、截断目标函数与GAE优势估计,可视化训练曲线
"""

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

class PPOAgent:
    """PPO智能体"""
    
    def __init__(self, state_dim=136, action_dim=6, lr=3e-4, gamma=0.99, lam=0.95, clip_eps=0.2):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.lam = lam
        self.clip_eps = clip_eps
        
        # 策略网络参数 (简化线性策略)
        self.theta_policy = np.random.randn(state_dim, action_dim) * 0.01
        self.theta_value = np.random.randn(state_dim, 1) * 0.01
        
        # 优化器参数
        self.lr = lr
        
        # 存储轨迹
        self.trajectory = []
        
    def select_action(self, state):
        """选择动作 (高斯策略)"""
        mean = state @ self.theta_policy
        std = 0.1  # 固定标准差
        action = mean + np.random.randn(self.action_dim) * std
        
        # 计算log概率
        log_prob = -0.5 * np.sum((action - mean)**2) / (std**2) - 0.5 * self.action_dim * np.log(2 * np.pi * std**2)
        
        return action, log_prob, mean
    
    def estimate_value(self, state):
        """估计状态值"""
        return state @ self.theta_value
    
    def store_transition(self, state, action, reward, next_state, done, log_prob):
        """存储转移"""
        self.trajectory.append({
            'state': state,
            'action': action,
            'reward': reward,
            'next_state': next_state,
            'done': done,
            'log_prob': log_prob
        })
    
    def compute_gae(self):
        """计算广义优势估计"""
        advantages = []
        gae = 0
        
        for t in reversed(range(len(self.trajectory))):
            trans = self.trajectory[t]
            state = trans['state']
            next_state = trans['next_state']
            reward = trans['reward']
            done = trans['done']
            
            value = self.estimate_value(state)[0]
            next_value = self.estimate_value(next_state)[0] if not done else 0
            
            delta = reward + self.gamma * next_value - value
            gae = delta + self.gamma * self.lam * (0 if done else gae)
            
            advantages.insert(0, gae)
        
        returns = [adv + self.estimate_value(trans['state'])[0] for adv, trans in zip(advantages, self.trajectory)]
        
        return np.array(advantages), np.array(returns)
    
    def update(self, epochs=10):
        """更新策略"""
        if len(self.trajectory) == 0:
            return {}
        
        # 计算优势
        advantages, returns = self.compute_gae()
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        # 提取旧log概率
        old_log_probs = np.array([t['log_prob'] for t in self.trajectory])
        states = np.array([t['state'] for t in self.trajectory])
        actions = np.array([t['action'] for t in self.trajectory])
        
        policy_losses = []
        value_losses = []
        
        for _ in range(epochs):
            # 重新计算log概率 (简化)
            means = states @ self.theta_policy
            std = 0.1
            new_log_probs = -0.5 * np.sum((actions - means)**2, axis=1) / (std**2)
            
            # 重要性采样比率
            ratios = np.exp(new_log_probs - old_log_probs)
            
            # 截断目标
            surr1 = ratios * advantages
            surr2 = np.clip(ratios, 1 - self.clip_eps, 1 + self.clip_eps) * advantages
            
            policy_loss = -np.mean(np.minimum(surr1, surr2))
            
            # 值函数损失
            values = states @ self.theta_value
            value_loss = np.mean((returns.reshape(-1, 1) - values)**2)
            
            # 梯度更新 (简化数值梯度)
            grad_policy = np.random.randn(*self.theta_policy.shape) * 0.01 - 0.001 * self.theta_policy
            grad_value = np.random.randn(*self.theta_value.shape) * 0.01 - 0.001 * self.theta_value
            
            self.theta_policy -= self.lr * grad_policy
            self.theta_value -= self.lr * grad_value
            
            policy_losses.append(policy_loss)
            value_losses.append(value_loss.mean())
        
        self.trajectory = []
        
        return {
            'policy_loss': np.mean(policy_losses),
            'value_loss': np.mean(value_losses),
            'advantage_mean': advantages.mean()
        }

def visualize_ppo():
    """可视化PPO训练"""
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    
    agent = PPOAgent(state_dim=136, action_dim=6)
    
    # 模拟训练
    n_episodes = 100
    episode_rewards = []
    policy_losses = []
    value_losses = []
    entropy_bonuses = []
    
    for episode in range(n_episodes):
        # 模拟一个episode
        state = np.random.randn(agent.state_dim)
        episode_reward = 0
        
        for step in range(50):
            action, log_prob, _ = agent.select_action(state)
            next_state = np.random.randn(agent.state_dim)
            reward = -np.linalg.norm(action) * 0.1 + np.random.randn() * 0.1
            
            if step > 40:
                reward += 10  # 成功奖励
                done = True
            else:
                done = False
            
            agent.store_transition(state, action, reward, next_state, done, log_prob)
            episode_reward += reward
            state = next_state
            
            if done:
                break
        
        episode_rewards.append(episode_reward)
        
        # 每4个episode更新一次
        if episode % 4 == 0 and len(agent.trajectory) > 0:
            metrics = agent.update(epochs=5)
            policy_losses.append(metrics['policy_loss'])
            value_losses.append(metrics['value_loss'])
            entropy_bonuses.append(metrics['advantage_mean'])
    
    # 子图1: 奖励曲线
    ax1 = axes[0, 0]
    ax1.plot(episode_rewards, 'b-', linewidth=2, alpha=0.7)
    ax1.plot(np.convolve(episode_rewards, np.ones(10)/10, mode='valid'), 'r-', linewidth=2, label='Moving Avg')
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Total Reward')
    ax1.set_title('Training Reward Curve', fontweight='bold')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 子图2: 策略损失
    ax2 = axes[0, 1]
    ax2.plot(policy_losses, 'g-', linewidth=2)
    ax2.set_xlabel('Update Step')
    ax2.set_ylabel('Policy Loss')
    ax2.set_title('Policy Loss (CLIP Objective)', fontweight='bold')
    ax2.grid(True, alpha=0.3)
    
    # 子图3: 值函数损失
    ax3 = axes[0, 2]
    ax3.plot(value_losses, 'orange', linewidth=2)
    ax3.set_xlabel('Update Step')
    ax3.set_ylabel('Value Loss (MSE)')
    ax3.set_title('Value Function Loss', fontweight='bold')
    ax3.grid(True, alpha=0.3)
    
    # 子图4: 优势函数分布
    ax4 = axes[1, 0]
    # 模拟GAE计算
    advantages = np.random.randn(1000)
    ax4.hist(advantages, bins=30, color='purple', alpha=0.7, edgecolor='black')
    ax4.axvline(x=0, color='red', linestyle='--', label='Zero')
    ax4.set_xlabel('Advantage Value')
    ax4.set_ylabel('Frequency')
    ax4.set_title('GAE Advantage Distribution', fontweight='bold')
    ax4.legend()
    ax4.grid(True, alpha=0.3, axis='y')
    
    # 子图5: 截断比率分布
    ax5 = axes[1, 1]
    ratios = np.random.lognormal(0, 0.2, 1000)
    clipped_ratios = np.clip(ratios, 1 - agent.clip_eps, 1 + agent.clip_eps)
    
    ax5.hist(ratios, bins=30, alpha=0.5, label='Original', color='blue')
    ax5.hist(clipped_ratios, bins=30, alpha=0.5, label='Clipped', color='red')
    ax5.axvline(x=1-agent.clip_eps, color='green', linestyle='--', label='Clip Bound')
    ax5.axvline(x=1+agent.clip_eps, color='green', linestyle='--')
    ax5.set_xlabel('Importance Sampling Ratio')
    ax5.set_ylabel('Frequency')
    ax5.set_title('PPO Clipping Effect', fontweight='bold')
    ax5.legend()
    ax5.grid(True, alpha=0.3, axis='y')
    
    # 子图6: 策略熵
    ax6 = axes[1, 2]
    entropies = []
    for _ in range(50):
        state = np.random.randn(agent.state_dim)
        _, _, mean = agent.select_action(state)
        # 简熵计算
        entropy = np.linalg.norm(mean) * 0.1
        entropies.append(entropy)
    
    ax6.plot(entropies, linewidth=2, color='brown')
    ax6.set_xlabel('Step')
    ax6.set_ylabel('Policy Entropy')
    ax6.set_title('Policy Entropy Over Time', fontweight='bold')
    ax6.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('ppo_grasp_training.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("PPO训练可视化完成")
    print(f"最终平均奖励: {np.mean(episode_rewards[-10:]):.2f}")
    print(f"策略更新次数: {len(policy_losses)}")

if __name__ == "__main__":
    visualize_ppo()

脚本五:奖励函数设计与域随机化

Python

"""
Script: reward_domain_randomization.py
Content: 5.2.2.2 奖励函数设计与5.2.3.2 域随机化
Usage: 直接运行 python reward_domain_randomization.py
功能: 实现复合奖励函数与域随机化策略,可视化奖励组成与参数分布
"""

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

class RewardFunction:
    """复合奖励函数"""
    
    def __init__(self):
        self.weights = {
            'success': 10.0,
            'approach': -0.1,
            'smoothness': -0.01,
            'penetration': -5.0,
            'force': -0.001
        }
        
    def compute(self, state, action, next_state, info):
        """计算奖励"""
        reward = 0
        components = {}
        
        # 1. 成功奖励
        if info.get('grasp_success', False):
            reward += self.weights['success']
            components['success'] = self.weights['success']
        else:
            components['success'] = 0
        
        # 2. 接近奖励 (距离惩罚)
        distance = np.linalg.norm(next_state['ee_pos'] - info['object_pos'])
        approach_reward = self.weights['approach'] * distance
        reward += approach_reward
        components['approach'] = approach_reward
        
        # 3. 动作平滑性惩罚
        if 'prev_action' in info:
            smooth_penalty = self.weights['smoothness'] * np.linalg.norm(action - info['prev_action'])**2
            reward += smooth_penalty
            components['smoothness'] = smooth_penalty
        else:
            components['smoothness'] = 0
        
        # 4. 穿透惩罚
        penetration = info.get('penetration_depth', 0)
        pen_penalty = self.weights['penetration'] * max(0, penetration)
        reward += pen_penalty
        components['penetration'] = pen_penalty
        
        # 5. 接触力惩罚 (鼓励适度接触)
        contact_force = info.get('contact_force', 0)
        force_penalty = self.weights['force'] * contact_force**2
        reward += force_penalty
        components['force'] = force_penalty
        
        return reward, components

class DomainRandomization:
    """域随机化"""
    
    def __init__(self):
        self.parameters = {
            'mass': (0.5, 2.0),
            'friction': (0.3, 1.0),
            'gravity': (9.0, 10.0),
            'com_offset': (-0.02, 0.02),
            'light_intensity': (0.8, 1.2),
            'camera_noise': (0.0, 0.01),
            'actuator_gain': (0.9, 1.1)
        }
        
        self.current_params = {}
        
    def randomize(self):
        """随机化参数"""
        self.current_params = {}
        for param, (min_val, max_val) in self.parameters.items():
            if param == 'com_offset':
                # 3D偏移
                self.current_params[param] = np.random.uniform(min_val, max_val, 3)
            else:
                self.current_params[param] = np.random.uniform(min_val, max_val)
        return self.current_params
    
    def get_randomized_env(self):
        """获取随机化后的环境参数"""
        return self.current_params

def visualize_reward_dr():
    """可视化奖励与域随机化"""
    fig = plt.figure(figsize=(16, 12))
    
    reward_fn = RewardFunction()
    dr = DomainRandomization()
    
    # 模拟轨迹
    n_steps = 100
    states = {
        'ee_pos': np.random.randn(n_steps, 3) * 0.1 + np.array([0.5, 0, 0.2])
    }
    actions = np.random.randn(n_steps, 6) * 0.1
    info = {
        'object_pos': np.array([0.5, 0, 0.1]),
        'penetration_depth': np.maximum(0, np.random.randn(n_steps) * 0.005),
        'contact_force': np.abs(np.random.randn(n_steps) * 10),
        'grasp_success': [False] * 95 + [True] * 5
    }
    
    rewards = []
    components_history = {k: [] for k in ['success', 'approach', 'smoothness', 'penetration', 'force']}
    
    for i in range(n_steps):
        info['prev_action'] = actions[i-1] if i > 0 else actions[i]
        state = {k: v[i] for k, v in states.items()}
        next_state = {k: v[i] for k, v in states.items()}
        
        r, comp = reward_fn.compute(state, actions[i], next_state, 
                                   {**info, 'grasp_success': info['grasp_success'][i]})
        rewards.append(r)
        for k, v in comp.items():
            components_history[k].append(v)
    
    # 子图1: 奖励组成堆叠图
    ax1 = fig.add_subplot(231)
    bottom = np.zeros(n_steps)
    colors = {'success': 'green', 'approach': 'blue', 'smoothness': 'orange', 
             'penetration': 'red', 'force': 'purple'}
    
    for key in ['approach', 'smoothness', 'force', 'penetration', 'success']:
        values = np.array(components_history[key])
        ax1.bar(range(n_steps), values, bottom=bottom, label=key, color=colors[key], alpha=0.7)
        bottom += values
    
    ax1.set_xlabel('Step')
    ax1.set_ylabel('Reward Component')
    ax1.set_title('Reward Decomposition', fontweight='bold')
    ax1.legend(loc='lower left', fontsize=8)
    
    # 子图2: 累计奖励
    ax2 = fig.add_subplot(232)
    cumulative = np.cumsum(rewards)
    ax2.plot(cumulative, 'b-', linewidth=2)
    ax2.set_xlabel('Step')
    ax2.set_ylabel('Cumulative Reward')
    ax2.set_title('Cumulative Reward Over Episode', fontweight='bold')
    ax2.grid(True, alpha=0.3)
    
    # 子图3: 域随机化参数分布
    ax3 = fig.add_subplot(233)
    n_randomizations = 100
    param_samples = {param: [] for param in dr.parameters.keys()}
    
    for _ in range(n_randomizations):
        params = dr.randomize()
        for param, value in params.items():
            if param == 'com_offset':
                param_samples[param].append(np.linalg.norm(value))
            else:
                param_samples[param].append(value)
    
    # 小提琴图数据准备
    data_to_plot = [param_samples[p] for p in ['mass', 'friction', 'gravity', 'actuator_gain']]
    parts = ax3.violinplot(data_to_plot, positions=range(4), showmeans=True)
    
    for pc, color in zip(parts['bodies'], ['blue', 'green', 'red', 'purple']):
        pc.set_facecolor(color)
        pc.set_alpha(0.7)
    
    ax3.set_xticks(range(4))
    ax3.set_xticklabels(['Mass', 'Friction', 'Gravity', 'Actuator'], rotation=15)
    ax3.set_ylabel('Parameter Value')
    ax3.set_title('Domain Randomization Distributions', fontweight='bold')
    ax3.grid(True, alpha=0.3, axis='y')
    
    # 子图4: 穿透深度 vs 奖励
    ax4 = fig.add_subplot(234)
    pen_depths = np.array(components_history['penetration']) / reward_fn.weights['penetration']
    ax4.scatter(pen_depths, rewards, c='red', alpha=0.5, s=20)
    ax4.set_xlabel('Penetration Depth (m)')
    ax4.set_ylabel('Total Reward')
    ax4.set_title('Penetration vs Reward', fontweight='bold')
    ax4.grid(True, alpha=0.3)
    
    # 子图5: 奖励密度估计
    ax5 = fig.add_subplot(235)
    ax5.hist(rewards, bins=30, color='steelblue', alpha=0.7, edgecolor='black', density=True)
    ax5.axvline(x=np.mean(rewards), color='red', linestyle='--', linewidth=2, label=f'Mean: {np.mean(rewards):.2f}')
    ax5.set_xlabel('Reward Value')
    ax5.set_ylabel('Density')
    ax5.set_title('Reward Distribution', fontweight='bold')
    ax5.legend()
    ax5.grid(True, alpha=0.3, axis='y')
    
    # 子图6: 域随机化效果对比
    ax6 = fig.add_subplot(236)
    # 模拟有无DR的性能差异
    episodes = np.arange(50)
    performance_no_dr = 50 * (1 - np.exp(-0.1 * episodes)) + np.random.randn(50) * 5
    performance_dr = 50 * (1 - np.exp(-0.05 * episodes)) + np.random.randn(50) * 8  # 初期更差但更鲁棒
    
    ax6.plot(episodes, performance_no_dr, 'b-', linewidth=2, label='No DR', alpha=0.7)
    ax6.fill_between(episodes, performance_no_dr - 5, performance_no_dr + 5, alpha=0.2)
    
    ax6.plot(episodes, performance_dr, 'r-', linewidth=2, label='With DR', alpha=0.7)
    ax6.fill_between(episodes, performance_dr - 8, performance_dr + 8, alpha=0.2)
    
    ax6.set_xlabel('Episode')
    ax6.set_ylabel('Success Rate (%)')
    ax6.set_title('Domain Randomization Effect', fontweight='bold')
    ax6.legend()
    ax6.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('reward_domain_randomization.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("奖励函数与域随机化可视化完成")
    print(f"平均奖励: {np.mean(rewards):.3f}")
    print(f"成功步骤数: {sum(info["grasp_success"])}")

if __name__ == "__main__":
    visualize_reward_dr()

脚本六:Isaac Gym并行仿真

Python

"""
Script: isaac_gym_parallel.py
Content: 5.2.3.1 Isaac Gym并行加速
Usage: 直接运行 python isaac_gym_parallel.py
功能: 模拟GPU并行环境执行,可视化并行扩展性与吞吐量
"""

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time

class ParallelEnvSimulator:
    """并行环境模拟器 (模拟Isaac Gym)"""
    
    def __init__(self, n_envs=1024, state_dim=136, action_dim=6):
        self.n_envs = n_envs
        self.state_dim = state_dim
        self.action_dim = action_dim
        
        # 批量状态
        self.states = np.random.randn(n_envs, state_dim)
        self.objects_pos = np.random.randn(n_envs, 3) * 0.1 + 0.5
        
    def reset(self):
        """重置所有环境"""
        self.states = np.random.randn(self.n_envs, self.state_dim)
        return self.states
    
    def step(self, actions):
        """
        并行步进所有环境
        actions: [n_envs, action_dim]
        """
        # 模拟物理计算
        # 更新状态 (简化动力学)
        self.states[:, :3] += actions[:, :3] * 0.01  # 位置更新
        
        # 计算奖励 (向量化)
        distances = np.linalg.norm(self.states[:, :3] - self.objects_pos, axis=1)
        rewards = -distances
        
        # 检查终止 (向量化)
        dones = distances < 0.02
        
        return self.states, rewards, dones
    
    def simulate_epoch(self, steps=100):
        """模拟一个训练周期"""
        start_time = time.time()
        
        for _ in range(steps):
            actions = np.random.randn(self.n_envs, self.action_dim)
            _, _, _ = self.step(actions)
        
        elapsed = time.time() - start_time
        throughput = (self.n_envs * steps) / elapsed
        
        return elapsed, throughput

def visualize_parallel_sim():
    """可视化并行仿真性能"""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 子图1: 并行扩展性
    ax1 = axes[0, 0]
    env_counts = [1, 8, 32, 128, 512, 1024, 4096]
    throughputs = []
    times_per_step = []
    
    for n_env in env_counts:
        sim = ParallelEnvSimulator(n_envs=n_env)
        elapsed, throughput = sim.simulate_epoch(steps=50)
        throughputs.append(throughput)
        times_per_step.append(elapsed / 50 * 1000)  # ms per step
    
    ax1.semilogx(env_counts, throughputs, 'b-o', linewidth=2, markersize=8)
    ax1.set_xlabel('Number of Parallel Environments')
    ax1.set_ylabel('Throughput (steps/sec)')
    ax1.set_title('Parallel Scaling (Simulated)', fontweight='bold')
    ax1.grid(True, alpha=0.3, which='both')
    
    # 添加理想线性扩展线
    ideal = [throughputs[0] * (n / env_counts[0]) for n in env_counts]
    ax1.semilogx(env_counts, ideal, 'r--', linewidth=1, alpha=0.5, label='Ideal Linear')
    ax1.legend()
    
    # 子图2: 每步时间
    ax2 = axes[0, 1]
    ax2.bar(range(len(env_counts)), times_per_step, color='steelblue', alpha=0.7)
    ax2.set_xticks(range(len(env_counts)))
    ax2.set_xticklabels([str(e) for e in env_counts], rotation=45)
    ax2.set_ylabel('Time per Step (ms)')
    ax2.set_title('Step Time vs Parallelism', fontweight='bold')
    ax2.grid(True, alpha=0.3, axis='y')
    
    # 子图3: 内存占用估计
    ax3 = axes[1, 0]
    # 假设每个环境状态占用内存
    bytes_per_env = 136 * 4 + 6 * 4  # float32
    memory_mb = [n * bytes_per_env / (1024**2) for n in env_counts]
    
    ax3.semilogy(env_counts, memory_mb, 'g-s', linewidth=2, markersize=8)
    ax3.set_xlabel('Number of Environments')
    ax3.set_ylabel('Memory Usage (MB)')
    ax3.set_title('Memory Scaling', fontweight='bold')
    ax3.grid(True, alpha=0.3, which='both')
    
    # 子图4: GPU利用率模拟
    ax4 = axes[1, 1]
    n_envs_gpu = np.array([128, 256, 512, 1024, 2048, 4096, 8192])
    # 模拟GPU利用率随并行度增加
    utilization = 100 * (1 - np.exp(-n_envs_gpu / 2000))
    
    ax4.plot(n_envs_gpu, utilization, 'purple', linewidth=3)
    ax4.axhline(y=95, color='red', linestyle='--', label='Target Utilization')
    ax4.fill_between(n_envs_gpu, utilization, alpha=0.3, color='purple')
    ax4.set_xlabel('Parallel Environments')
    ax4.set_ylabel('GPU Utilization (%)')
    ax4.set_title('GPU Utilization Scaling', fontweight='bold')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('isaac_gym_parallel.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("Isaac Gym并行仿真可视化完成")
    print(f"最大吞吐量: {max(throughputs):.0f} steps/sec")
    print(f"最佳并行数: {env_counts[np.argmax(throughputs)]}")

if __name__ == "__main__":
    visualize_parallel_sim()
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐