【全身灵巧操作:3D扩散策略、力自适应与接触显式学习】第七章 高效推理与实时控制 》》》人形机器人专栏
目录
7.1.1 一致性模型(Consistency Model)应用
脚本3 INT8/FP16量化、TensorRT优化、边缘设备实时性验证
脚本7 GelSight图像处理、接触几何重建、滑移检测、力分布估计
脚本8 多模态时间同步、触觉特征编码器、接触事件触发的策略切换
第七章 高效推理与实时控制
7.1 扩散策略的推理加速
7.1.1 一致性模型(Consistency Model)应用
一致性模型为扩散策略的实时部署提供了突破性解决方案。传统扩散模型依赖多步去噪迭代,每一步都需要完整的网络前向传播,这在高频控制回路中构成了计算瓶颈。一致性模型通过建立噪声动作到清晰动作的直接映射,将五十步以上的迭代压缩为单步推理,同时保持策略的决策质量。
在双边运动一致性框架中,单步推理策略通过蒸馏训练获得。教师模型为标准的多步扩散策略,学生模型学习在任意噪声水平下直接预测最终动作。训练过程采用一致性蒸馏损失,强制学生在不同时间步的输出保持一致。具体实现时,引入目标一致性过滤机制,仅当教师模型的多步预测方差低于阈值时才进行蒸馏,避免将不确定性传递给学生模型。
训练技巧方面,采用渐进式时间步课程策略。初期训练集中在接近终点的噪声水平,随着训练进行逐渐扩展到低信噪比区域。这种课程学习使学生模型先掌握动作的大致结构,再细化细节特征。此外,使用指数移动平均更新教师模型参数,确保蒸馏目标的稳定性。在精度与延迟的权衡分析中,单步一致性模型在典型操作任务上的成功率达到多步教师的百分之九十五以上,而推理延迟从数百毫秒降至十毫秒级别,满足实时控制需求。
7.1.2 稀疏扩散与关键姿态预测
稀疏扩散策略通过动作空间的重参数化实现计算效率的质的飞跃。不同于预测密集的动作序列,该方法识别出对任务完成至关重要的稀疏关键姿态,将高维轨迹预测转化为低维关键点生成。关键姿态定义为运动学状态的突变点,包括预接触姿态、接触建立姿态和力施加姿态,这些节点包含了任务执行的全部必要信息。
关键姿态作为动作锚点,提供了几何意义明确的规划路标。每个关键姿态包含六自由度位姿和 gripper 状态,通过线性变换锚定到物体表面特征。预测网络仅需输出相对于物体坐标系的偏移量,大幅简化了输出空间。在生成稀疏关键姿态序列后,通过局部轨迹优化填补中间路径。使用B样条插值确保轨迹的连续可微性,满足机器人关节速度和加速度约束。
密集轨迹的平滑处理采用多阶段优化。首先通过线性插值获得初始轨迹,然后利用最小加加速度准则进行平滑,最后投影到机器人运动学约束空间。这种分层处理确保轨迹既符合动力学限制,又保持对关键姿态的精确经过。在执行层面,采用模型预测控制风格的重规划策略,仅执行第一个关键姿态对应的轨迹段,随后基于新观测重新规划,实现闭环鲁棒性。
7.1.3 模型量化与边缘部署
模型量化是将扩散策略部署到边缘计算设备的关键技术。三十二位浮点数量化为十六位或八位整数表示,在保持模型精度的同时显著降低内存带宽需求和计算延迟。对扩散策略的量化需要特别关注激活值的动态范围,因为去噪网络在不同时间步的激活分布差异显著。采用逐层量化感知训练,为每一层学习最优的缩放因子和零点参数。
TensorRT优化实践包括算子融合、层张量化与内核自动调优。将卷积、批归一化与激活函数融合为单一内核,减少内存往返。对于注意力机制,采用多头注意力融合与内存布局优化,提高缓存命中率。在Jetson AGX Orin平台上的验证表明,经过INT8量化的扩散策略推理时间缩短至原始模型的三分之一,同时任务成功率下降控制在两个百分点以内。
嵌入式控制器的实时性验证需要建立端到端的延迟测试框架。从传感器数据采集、网络推理到动作指令发送,全流程时间戳记录确保延迟可测量。在Intel NUC平台上,通过OpenVINO运行时优化,扩散策略能够以每秒三十次的频率生成动作,满足大多数操作任务的实时性要求。进一步的优化包括异步数据传输与内存池管理,消除动态内存分配带来的不确定性。
7.2 模型预测控制(MPC)与学习策略的融合
7.2.1 学习-based MPC框架
学习-based MPC框架将扩散策略作为终端代价函数融入优化问题。传统MPC依赖手工设计的代价函数,难以捕捉复杂操作中的细微接触动力学。扩散策略提供的数据驱动代价项指导优化器趋向高成功率的动作区域。在滚动时域优化中,将学习代价与约束违反惩罚相结合,形成混合目标函数。
实时约束处理机制确保物理可行性与安全性。硬约束包括关节位置限制、速度限制和力矩限制,这些通过优化器的边界条件严格保证。软约束则编码为障碍函数,允许在轻微违反时提供梯度信号。学习先验通过价值函数形式引入,指导优化器在可行域内选择最优动作。动作块执行策略将优化后的轨迹分段下发,平衡计算延迟与控制精度。
在实现层面,采用实时迭代MPC架构,利用前一步的解作为热启动,减少迭代次数。当系统状态偏离预测轨迹时,扩散策略提供的终端代价帮助快速重新收敛到可行解。这种结合既利用了学习模型的全局规划能力,又保留了MPC的局部约束处理能力。
7.2.2 残差学习与物理一致性
残差学习机制修正MPC基于简化物理模型的预测误差。标准MPC使用刚体动力学模型,未建模的柔性接触、摩擦非线性和传动间隙导致模型与实际系统存在偏差。残差策略以MPC输出为输入,预测并补偿模型误差,输出修正后的动作指令。
物理约束的硬编码确保系统的本质安全。关节极限、奇异点规避和碰撞检测通过解析约束实现,任何学习组件的输出都需经过安全滤波器检验。学习软约束则处理难以解析建模的因素,如接触力的平滑过渡和能量效率优化。安全滤波器设计为凸优化问题,在动作偏离安全区域时投影回可行域,同时最小化对学习策略的修改。
残差策略的训练采用离线数据收集与在线微调相结合。首先基于MPC的历史运行数据训练初始模型,然后在真实系统上进行策略梯度微调。训练目标为最小化跟踪误差与能量消耗的加权组合,确保残差修正既提高精度又保持效率。
7.2.3 分层控制架构的实现
分层控制架构分离了高层决策与低层执行的时间尺度。高层扩散策略以一到十赫兹的频率生成关键姿态或目标位置,提供全局运动规划。低层MPC以五十到二百赫兹的频率处理高频动态,执行具体的力位控制。这种解耦使计算密集型的学习推理不会阻塞实时控制回路。
动作缓冲与插值策略平滑了不同频率层级间的接口。高层指令存储在循环缓冲区中,低层控制器通过样条插值生成连续轨迹。当新高层指令到达时,采用混合方案渐进过渡,避免动作跳变。延迟补偿通过状态预测器实现,利用历史观测预测当前指令执行时的系统状态,补偿传感器处理与网络推理引入的时滞。
状态预测器采用扩展卡尔曼滤波或学习动态模型,根据控制输入估计未来状态。在控制回路中,基于预测状态计算控制律,抵消固定与可变延迟的影响。这种架构确保了在存在显著通信延迟的情况下,系统仍能保持稳定与精确。
7.3 触觉反馈的实时集成
7.3.1 高频率触觉传感(GelSight等)
高频率触觉传感为精细操作提供了直接接触几何信息。GelSight传感器以超过一百赫兹的频率捕获接触表面的高分辨率形变图像,揭示纹理细节与力分布。实时处理流程包括图像去噪、参考帧减除和接触区域分割。通过光度立体视觉重建技术,将触觉图像转换为三维接触几何,计算接触点的表面法向与深度。
接触几何的实时重建采用GPU加速的相位解包裹算法。从硅胶形变图像中提取位移场,结合材料力学模型重建施加的力分布。滑移检测通过分析触觉图像的时间序列实现,监测接触区域的剪切形变积累。当检测到滑移趋势时,触发抓握力自适应调节,防止物体脱落。
力分布估计将高维触觉图像压缩为低维力螺旋表示。使用卷积神经网络编码触觉图像,输出六维力力矩向量。网络在仿真与真实数据上联合训练,利用域随机化技术提高泛化能力。处理延迟通过异步流水线优化,确保触觉反馈不阻塞主控制循环。
7.3.2 触觉-视觉-策略的融合
多模态观测的同步与对齐是融合策略的基础。视觉相机提供全局场景信息但更新频率较低,触觉传感器提供局部接触细节但仅在与物体接触时有效。时间同步通过硬件触发或软件时间戳对齐实现,确保视觉帧与触觉帧对应相同时刻的物理状态。空间对齐将触觉坐标系转换到机器人基座坐标系,结合手眼标定结果建立统一的空间参考。
触觉特征编码器采用轻量化卷积架构或视觉Transformer,从触觉图像提取紧凑的特征向量。编码器设计考虑计算效率,确保在实时约束下完成特征提取。触觉特征与视觉特征在策略网络中通过通道拼接或注意力机制融合,允许策略动态关注接触状态的变化。
接触事件触发的策略切换逻辑管理不同感知模态的权重。在非接触阶段,策略主要依赖视觉信息规划接近路径。当触觉传感器检测到接触建立时,策略切换至高频率触觉反馈模式,精细调节接触力与位置。切换逻辑基于阈值判断与状态机实现,避免在接触边界处的抖动。
7.3.3 闭环力控制的学习实现
从触觉反馈学习力调节策略使机器人能够处理未知物体的顺应控制。策略网络接收期望力与实际力的误差,输出末端执行器的微小位姿调整,实现力的闭环控制。训练过程在仿真环境中进行,使用物理引擎模拟不同刚度与摩擦特性的物体,通过域随机化确保策略的鲁棒性。
顺应控制参数学习优化了刚度矩阵与阻尼系数。传统方法需要人工调节这些参数,而学习策略能够根据物体特性自适应调整。对于刚性物体采用高刚度低阻尼参数,对于柔性物体采用低刚度高阻尼参数。策略输出这些参数的连续值,通过阻抗控制律转换为关节力矩指令。
力-位混合控制在接触任务中平衡位置跟踪与力施加。策略根据任务阶段动态分配力控与位控的权重。在自由空间运动时采用纯位置控制,在接触建立后过渡到力位混合控制,在力控子空间维持期望接触力,在位控子空间跟踪期望轨迹。这种混合控制通过选择矩阵实现,确保在过渡区域的平滑性。
以下是为第七章配套的完整可执行代码实现,每个脚本独立运行并包含详细注释和可视化。
"""
Script 1: consistency_model_distillation.py
Content: 一致性模型蒸馏训练、单步推理实现、精度-延迟权衡分析
Usage:
1. 安装依赖: pip install torch torchvision numpy matplotlib tqdm
2. 运行: python consistency_model_distillation.py
3. 输出: 训练损失曲线、单步vs多步精度对比、延迟测量可视化
Features:
- 一致性蒸馏损失实现
- 教师-学生模型架构
- OTS(On-Target Sampling)训练策略
- 实时延迟基准测试
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import time
from typing import Dict, Tuple, Optional
from dataclasses import dataclass
from tqdm import tqdm
@dataclass
class Config:
action_dim: int = 16
obs_dim: int = 512
hidden_dim: int = 256
num_diffusion_steps: int = 50
consistency_weight: float = 1.0
distillation_weight: float = 2.0
device: str = "cuda" if torch.cuda.is_available() else "cpu"
batch_size: int = 128
num_epochs: int = 100
class DiffusionPolicy(nn.Module):
"""标准多步扩散策略(教师模型)"""
def __init__(self, config: Config):
super().__init__()
self.config = config
self.time_embed = nn.Sequential(
nn.Linear(1, 128),
nn.SiLU(),
nn.Linear(128, 128)
)
self.noise_pred_net = nn.Sequential(
nn.Linear(config.action_dim + config.obs_dim + 128, config.hidden_dim),
nn.SiLU(),
nn.Linear(config.hidden_dim, config.hidden_dim),
nn.SiLU(),
nn.Linear(config.hidden_dim, config.action_dim)
)
def forward(self, noisy_action: torch.Tensor, obs: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
# t: [B, 1] 时间步
t_emb = self.time_embed(t)
x = torch.cat([noisy_action, obs, t_emb], dim=-1)
return self.noise_pred_net(x)
def sample(self, obs: torch.Tensor, num_steps: int = 50) -> torch.Tensor:
"""DDPM采样过程"""
batch_size = obs.shape[0]
action = torch.randn(batch_size, self.config.action_dim, device=obs.device)
timesteps = torch.linspace(999, 0, num_steps, device=obs.device).long()
for i in range(num_steps):
t = timesteps[i].unsqueeze(0).expand(batch_size, 1).float() / 1000.0
noise_pred = self.forward(action, obs, t)
# 简化的去噪步骤
alpha = 1.0 - (i + 1) / num_steps
action = (action - (1 - alpha) * noise_pred) / (alpha + 1e-8)
return action
class ConsistencyModel(nn.Module):
"""一致性模型(学生模型)- 单步推理"""
def __init__(self, config: Config):
super().__init__()
self.config = config
self.sigma_data = 0.5 # 数据标准差
self.encoder = nn.Sequential(
nn.Linear(config.action_dim + config.obs_dim, config.hidden_dim),
nn.SiLU(),
nn.Linear(config.hidden_dim, config.hidden_dim)
)
self.output_layer = nn.Sequential(
nn.Linear(config.hidden_dim, config.hidden_dim),
nn.SiLU(),
nn.Linear(config.hidden_dim, config.action_dim)
)
# 可学习的时间步嵌入
self.time_mlp = nn.Sequential(
nn.Linear(1, 64),
nn.SiLU(),
nn.Linear(64, config.hidden_dim)
)
def forward(self, noisy_action: torch.Tensor, obs: torch.Tensor, sigma: torch.Tensor) -> torch.Tensor:
# sigma: 噪声水平 [B, 1]
c_skip = self.sigma_data**2 / (sigma**2 + self.sigma_data**2)
c_out = sigma * self.sigma_data / (sigma**2 + self.sigma_data**2)**0.5
c_in = 1 / (self.sigma_data**2 + sigma**2)**0.5
c_noise = torch.log(sigma) / 4
x_in = c_in * noisy_action
t_emb = self.time_mlp(c_noise)
h = self.encoder(torch.cat([x_in, obs], dim=-1))
h = h + t_emb
F_theta = self.output_layer(h)
output = c_skip * noisy_action + c_out * F_theta
return output
def sample(self, obs: torch.Tensor) -> torch.Tensor:
"""单步采样"""
batch_size = obs.shape[0]
# 从先验分布采样
sigma_max = 80.0
noisy_action = torch.randn(batch_size, self.config.action_dim, device=obs.device) * sigma_max
sigma = torch.full((batch_size, 1), sigma_max, device=obs.device)
return self.forward(noisy_action, obs, sigma)
class ConsistencyDistillationTrainer:
"""一致性蒸馏训练器"""
def __init__(self, teacher: DiffusionPolicy, student: ConsistencyModel, config: Config):
self.teacher = teacher.to(config.device).eval()
self.student = student.to(config.device)
self.config = config
self.optimizer = torch.optim.AdamW(student.parameters(), lr=3e-4, weight_decay=0.01)
# EMA教师模型
self.teacher_ema = DiffusionPolicy(config).to(config.device)
self.teacher_ema.load_state_dict(teacher.state_dict())
self.ema_decay = 0.999
self.losses = []
def compute_consistency_loss(self, obs: torch.Tensor, actions: torch.Tensor) -> torch.Tensor:
batch_size = actions.shape[0]
# 采样噪声水平
sigma = torch.exp(torch.randn(batch_size, 1, device=self.config.device) * 0.5 - 2.0).clamp(0.002, 80.0)
# 加噪
noise = torch.randn_like(actions)
noisy_actions = actions + sigma * noise
# 学生模型输出
student_output = self.student(noisy_actions, obs, sigma)
# 使用EMA教师生成目标
with torch.no_grad():
# OTS: On-Target Sampling
target_actions = actions + 0.1 * torch.randn_like(actions) # 微小扰动
target_sigma = sigma * 0.5 # 更低噪声
teacher_output = self.teacher_ema(target_actions, obs, target_sigma)
# 一致性损失
loss = F.mse_loss(student_output, teacher_output)
return loss
def train_step(self, obs: torch.Tensor, actions: torch.Tensor) -> float:
self.optimizer.zero_grad()
loss = self.compute_consistency_loss(obs, actions)
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(self.student.parameters(), 1.0)
self.optimizer.step()
# 更新EMA教师
with torch.no_grad():
for p_ema, p in zip(self.teacher_ema.parameters(), self.teacher.parameters()):
p_ema.data.mul_(self.ema_decay).add_(p.data, alpha=1 - self.ema_decay)
return loss.item()
def benchmark_latency(self, obs: torch.Tensor) -> Dict[str, float]:
"""基准测试延迟"""
results = {}
# 预热
for _ in range(10):
_ = self.student.sample(obs)
_ = self.teacher.sample(obs, num_steps=50)
# 测试学生模型(单步)
torch.cuda.synchronize() if torch.cuda.is_available() else None
start = time.time()
for _ in range(100):
_ = self.student.sample(obs[:1])
torch.cuda.synchronize() if torch.cuda.is_available() else None
results['single_step_ms'] = (time.time() - start) / 100 * 1000
# 测试教师模型(多步)
torch.cuda.synchronize() if torch.cuda.is_available() else None
start = time.time()
for _ in range(10):
_ = self.teacher.sample(obs[:1], num_steps=50)
torch.cuda.synchronize() if torch.cuda.is_available() else None
results['multi_step_ms'] = (time.time() - start) / 10 * 1000
return results
def visualize_results(trainer: ConsistencyDistillationTrainer, config: Config):
"""可视化训练结果与性能对比"""
fig = plt.figure(figsize=(16, 10))
gs = fig.add_gridspec(2, 3, hspace=0.3, wspace=0.3)
# 1. 训练损失曲线
ax1 = fig.add_subplot(gs[0, 0])
ax1.plot(trainer.losses, linewidth=2, color='#2E86AB')
ax1.set_xlabel('Training Steps', fontsize=11)
ax1.set_ylabel('Consistency Loss', fontsize=11)
ax1.set_title('Consistency Distillation Training', fontsize=12, fontweight='bold')
ax1.grid(True, alpha=0.3)
# 2. 延迟对比柱状图
ax2 = fig.add_subplot(gs[0, 1])
obs_test = torch.randn(1, config.obs_dim, device=config.device)
latencies = trainer.benchmark_latency(obs_test)
bars = ax2.bar(['Single-Step\n(Consistency)', 'Multi-Step\n(Diffusion)'],
[latencies['single_step_ms'], latencies['multi_step_ms']],
color=['#A23B72', '#F18F01'], alpha=0.8, edgecolor='black', linewidth=1.5)
ax2.set_ylabel('Inference Latency (ms)', fontsize=11)
ax2.set_title('Inference Speed Comparison', fontsize=12, fontweight='bold')
# 添加数值标签
for bar in bars:
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.2f} ms', ha='center', va='bottom', fontsize=10, fontweight='bold')
# 3. 动作分布可视化
ax3 = fig.add_subplot(gs[0, 2])
obs_batch = torch.randn(100, config.obs_dim, device=config.device)
with torch.no_grad():
student_actions = trainer.student.sample(obs_batch).cpu().numpy()
teacher_actions = trainer.teacher.sample(obs_batch, num_steps=10).cpu().numpy()
ax3.scatter(student_actions[:, 0], student_actions[:, 1], alpha=0.5, label='Consistency Model', s=50)
ax3.scatter(teacher_actions[:, 0], teacher_actions[:, 1], alpha=0.5, label='Diffusion Model', s=50)
ax3.set_xlabel('Action Dimension 1', fontsize=11)
ax3.set_ylabel('Action Dimension 2', fontsize=11)
ax3.set_title('Action Distribution Comparison', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 精度-延迟权衡曲线
ax4 = fig.add_subplot(gs[1, :])
# 模拟不同步数下的精度和延迟
steps = [1, 5, 10, 20, 50]
# 模拟数据:步数增加,精度提高,延迟增加
accuracy = [0.85, 0.92, 0.95, 0.96, 0.97]
latency = [latencies['single_step_ms']] + [latencies['single_step_ms'] * s * 0.8 for s in steps[1:]]
ax4.plot(latency, accuracy, 'o-', linewidth=3, markersize=10, color='#C73E1D', label='Diffusion Steps vs Accuracy')
ax4.axvline(x=latencies['single_step_ms'], color='#A23B72', linestyle='--', linewidth=2,
label=f'Consistency Model ({latencies["single_step_ms"]:.1f} ms)')
ax4.axhline(y=0.95, color='green', linestyle='--', alpha=0.5, label='Target Accuracy (95%)')
ax4.set_xlabel('Inference Latency (ms)', fontsize=12)
ax4.set_ylabel('Task Success Rate', fontsize=12)
ax4.set_title('Accuracy-Latency Trade-off Analysis', fontsize=13, fontweight='bold')
ax4.legend(fontsize=11)
ax4.grid(True, alpha=0.3)
ax4.set_xlim(0, max(latency) * 1.1)
ax4.set_ylim(0.8, 1.0)
plt.suptitle('Consistency Model for Real-time Robot Control', fontsize=14, fontweight='bold', y=0.98)
plt.tight_layout()
plt.savefig('consistency_model_analysis.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 consistency_model_analysis.png")
plt.show()
def main():
config = Config()
print(f"使用设备: {config.device}")
# 初始化模型
teacher = DiffusionPolicy(config)
student = ConsistencyModel(config)
# 预训练教师模型(模拟)
print("预训练教师模型...")
teacher_optimizer = torch.optim.Adam(teacher.parameters(), lr=1e-3)
for _ in tqdm(range(500), desc="Teacher Pretraining"):
obs = torch.randn(config.batch_size, config.obs_dim, device=config.device)
actions = torch.randn(config.batch_size, config.action_dim, device=config.device)
t = torch.rand(config.batch_size, 1, device=config.device)
noise = torch.randn_like(actions)
noisy_actions = actions + t * noise
pred_noise = teacher(noisy_actions, obs, t)
loss = F.mse_loss(pred_noise, noise)
teacher_optimizer.zero_grad()
loss.backward()
teacher_optimizer.step()
# 一致性蒸馏训练
trainer = ConsistencyDistillationTrainer(teacher, student, config)
print("开始一致性蒸馏训练...")
for epoch in tqdm(range(config.num_epochs), desc="Distillation"):
obs = torch.randn(config.batch_size, config.obs_dim, device=config.device)
actions = torch.randn(config.batch_size, config.action_dim, device=config.device)
loss = trainer.train_step(obs, actions)
trainer.losses.append(loss)
if epoch % 20 == 0:
print(f"Epoch {epoch}, Loss: {loss:.4f}")
# 可视化结果
visualize_results(trainer, config)
if __name__ == "__main__":
main()
脚本2
"""
Script 2: sparse_keypose_diffusion.py
Content: 稀疏关键姿态生成、动作锚点机制、密集轨迹插值与平滑
Usage:
1. 安装依赖: pip install torch numpy matplotlib scipy scikit-learn
2. 运行: python sparse_keypose_diffusion.py
3. 输出: 关键姿态可视化、轨迹插值对比、执行时间分析
Features:
- AnchorDP3风格的关键姿态预测
- B样条轨迹平滑
- 稀疏到密集的动作生成
- 实时重规划模拟
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation
from scipy.interpolate import splprep, splev
from scipy.spatial.transform import Slerp, Rotation
from typing import List, Tuple, Optional
import time
class KeyposeDiffusionPolicy(nn.Module):
"""稀疏关键姿态扩散策略"""
def __init__(self, obs_dim: int = 512, action_dim: int = 7, num_keyposes: int = 8):
super().__init__()
self.obs_dim = obs_dim
self.action_dim = action_dim # [x, y, z, qx, qy, qz, qw, gripper]
self.num_keyposes = num_keyposes
# 观测编码器
self.obs_encoder = nn.Sequential(
nn.Linear(obs_dim, 256),
nn.ReLU(),
nn.Linear(256, 128)
)
# 去噪网络
self.noise_pred = nn.ModuleList([
nn.Sequential(
nn.Linear(action_dim * num_keyposes + 128 + 1, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim * num_keyposes)
) for _ in range(3) # 分层去噪
])
def forward(self, noisy_keyposes: torch.Tensor, obs: torch.Tensor, t: torch.Tensor, level: int) -> torch.Tensor:
obs_feat = self.obs_encoder(obs)
x = torch.cat([noisy_keyposes.flatten(1), obs_feat, t], dim=-1)
return self.noise_pred[level](x).view(-1, self.num_keyposes, self.action_dim)
def sample(self, obs: torch.Tensor, num_steps: int = 10) -> torch.Tensor:
batch_size = obs.shape[0]
keyposes = torch.randn(batch_size, self.num_keyposes, self.action_dim, device=obs.device)
for level in range(3):
for step in range(num_steps // 3):
t = torch.ones(batch_size, 1, device=obs.device) * (1.0 - step / (num_steps // 3))
noise_pred = self.forward(keyposes, obs, t, level)
keyposes = keyposes - 0.1 * noise_pred
return keyposes
class TrajectoryInterpolator:
"""轨迹插值与平滑"""
def __init__(self, smoothness: float = 0.1):
self.smoothness = smoothness
def interpolate_dense(self, keyposes: np.ndarray, num_points: int = 100) -> np.ndarray:
"""
从稀疏关键姿态生成密集轨迹
keyposes: [N, 7] (position + quaternion)
"""
positions = keyposes[:, :3]
grippers = keyposes[:, -1:]
# 位置插值使用B样条
tck, u = splprep(positions.T, s=self.smoothness, k=min(3, len(keyposes)-1))
u_new = np.linspace(0, 1, num_points)
pos_smooth = np.array(splev(u_new, tck)).T
# 姿态插值使用球面线性插值(SLERP)
rots = Rotation.from_quat(keyposes[:, 3:7])
slerp = Slerp(np.linspace(0, 1, len(keyposes)), rots)
rot_smooth = slerp(u_new).as_quat()
# gripper线性插值
gripper_smooth = np.interp(u_new, np.linspace(0, 1, len(keyposes)), grippers.flatten()).reshape(-1, 1)
return np.concatenate([pos_smooth, rot_smooth, gripper_smooth], axis=1)
def apply_kinematic_constraints(self, trajectory: np.ndarray,
max_vel: float = 0.5,
max_acc: float = 2.0,
dt: float = 0.01) -> np.ndarray:
"""应用运动学约束(速度和加速度限制)"""
constrained = trajectory.copy()
positions = trajectory[:, :3]
# 计算速度
velocities = np.diff(positions, axis=0) / dt
vel_magnitudes = np.linalg.norm(velocities, axis=1)
# 速度限制
for i in range(len(velocities)):
if vel_magnitudes[i] > max_vel:
velocities[i] = velocities[i] / vel_magnitudes[i] * max_vel
# 重新积分位置
constrained[1:, :3] = positions[0] + np.cumsum(velocities * dt, axis=0)
return constrained
class AnchorBasedPlanner:
"""基于锚点的规划器(模拟AnchorDP3)"""
def __init__(self, policy: KeyposeDiffusionPolicy, interpolator: TrajectoryInterpolator):
self.policy = policy
self.interpolator = interpolator
self.action_buffer = []
self.current_keypose_idx = 0
def plan_keyposes(self, obs: torch.Tensor) -> np.ndarray:
"""生成关键姿态"""
with torch.no_grad():
keyposes = self.policy.sample(obs).cpu().numpy()[0]
return keyposes
def replan(self, obs: torch.Tensor, current_pose: np.ndarray) -> np.ndarray:
"""重规划(MPC风格)"""
# 生成新的关键姿态序列
keyposes = self.plan_keyposes(obs)
# 根据当前位置调整第一个关键姿态(闭环反馈)
keyposes[0, :3] = current_pose[:3] + 0.3 * (keyposes[0, :3] - current_pose[:3])
# 插值生成完整轨迹
dense_traj = self.interpolator.interpolate_dense(keyposes, num_points=200)
# 应用约束
constrained_traj = self.interpolator.apply_kinematic_constraints(dense_traj)
return constrained_traj
def execute_step(self, obs: torch.Tensor, current_pose: np.ndarray) -> np.ndarray:
"""执行单步(返回下一个目标姿态)"""
if len(self.action_buffer) < 10: # 缓冲区低,重规划
self.action_buffer = self.replan(obs, current_pose).tolist()
self.current_keypose_idx = 0
next_action = self.action_buffer.pop(0)
return np.array(next_action)
def visualize_planning(planner: AnchorBasedPlanner, obs: torch.Tensor):
"""可视化规划结果"""
fig = plt.figure(figsize=(16, 12))
# 生成示例观测和规划
keyposes = planner.plan_keyposes(obs)
dense_traj = planner.interpolator.interpolate_dense(keyposes, num_points=300)
constrained_traj = planner.interpolator.apply_kinematic_constraints(dense_traj)
# 1. 3D轨迹可视化
ax1 = fig.add_subplot(221, projection='3d')
ax1.plot(dense_traj[:, 0], dense_traj[:, 1], dense_traj[:, 2],
'b-', linewidth=2, alpha=0.6, label='Interpolated Trajectory')
ax1.scatter(keyposes[:, 0], keyposes[:, 1], keyposes[:, 2],
c='red', s=100, marker='^', label='Keyposes', edgecolors='black', linewidth=2)
ax1.plot(constrained_traj[:, 0], constrained_traj[:, 1], constrained_traj[:, 2],
'g--', linewidth=2, label='Kinematically Constrained')
# 标记起点和终点
ax1.scatter(*dense_traj[0, :3], c='green', s=200, marker='o', label='Start')
ax1.scatter(*dense_traj[-1, :3], c='orange', s=200, marker='X', label='Goal')
ax1.set_xlabel('X (m)')
ax1.set_ylabel('Y (m)')
ax1.set_zlabel('Z (m)')
ax1.set_title('3D Trajectory with Sparse Keyposes', fontsize=12, fontweight='bold')
ax1.legend()
# 2. Gripper状态
ax2 = fig.add_subplot(222)
time_dense = np.linspace(0, 1, len(dense_traj))
time_key = np.linspace(0, 1, len(keyposes))
ax2.plot(time_dense, dense_traj[:, -1], 'b-', linewidth=2, label='Dense Trajectory')
ax2.scatter(time_key, keyposes[:, -1], c='red', s=100, zorder=5, label='Keyposes')
ax2.set_xlabel('Normalized Time')
ax2.set_ylabel('Gripper State (0=open, 1=closed)')
ax2.set_title('Gripper Profile', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. 速度剖面
ax3 = fig.add_subplot(223)
dt = 0.01
vel_dense = np.linalg.norm(np.diff(dense_traj[:, :3], axis=0), axis=1) / dt
vel_const = np.linalg.norm(np.diff(constrained_traj[:, :3], axis=0), axis=1) / dt
ax3.plot(time_dense[1:], vel_dense, 'b-', linewidth=2, alpha=0.7, label='Original')
ax3.plot(time_dense[1:], vel_const, 'g-', linewidth=2, label='Constrained')
ax3.axhline(y=0.5, color='r', linestyle='--', label='Max Velocity')
ax3.set_xlabel('Normalized Time')
ax3.set_ylabel('Velocity (m/s)')
ax3.set_title('Velocity Profile Comparison', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 时间对比
ax4 = fig.add_subplot(224)
methods = ['Sparse Keyposes\n(Generation)', 'Dense Trajectory\n(Interpolation)', 'Full Dense\n(Diffusion)']
times = [5, 15, 200] # 模拟时间 (ms)
colors = ['#2E86AB', '#A23B72', '#F18F01']
bars = ax4.bar(methods, times, color=colors, alpha=0.8, edgecolor='black', linewidth=2)
ax4.set_ylabel('Computation Time (ms)')
ax4.set_title('Computation Time Comparison', fontsize=12, fontweight='bold')
for bar, t in zip(bars, times):
height = bar.get_height()
ax4.text(bar.get_x() + bar.get_width()/2., height,
f'{t} ms', ha='center', va='bottom', fontsize=11, fontweight='bold')
plt.suptitle('Sparse Keypose-based Motion Planning', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('sparse_keypose_planning.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 sparse_keypose_planning.png")
plt.show()
def main():
print("初始化稀疏关键姿态扩散策略...")
policy = KeyposeDiffusionPolicy(obs_dim=512, action_dim=8, num_keyposes=8)
interpolator = TrajectoryInterpolator(smoothness=0.05)
planner = AnchorBasedPlanner(policy, interpolator)
# 模拟观测
obs = torch.randn(1, 512)
print("生成可视化...")
visualize_planning(planner, obs)
# 实时执行模拟
print("模拟实时执行...")
current_pose = np.array([0.0, 0.0, 0.2, 0, 0, 0, 1, 0])
execution_times = []
for step in range(50):
start = time.time()
next_action = planner.execute_step(obs, current_pose)
execution_times.append((time.time() - start) * 1000)
current_pose = next_action
if step % 10 == 0:
print(f"Step {step}: Execution time = {execution_times[-1]:.2f} ms")
print(f"Average execution latency: {np.mean(execution_times):.2f} ms")
if __name__ == "__main__":
main()
脚本3 INT8/FP16量化、TensorRT优化、边缘设备实时性验证
"""
Script 3: quantization_edge_deployment.py
Content: INT8/FP16量化、TensorRT优化、边缘设备实时性验证
Usage:
1. 安装依赖: pip install torch torchvision numpy matplotlib tensorrt pycuda
2. 运行: python quantization_edge_deployment.py
3. 输出: 量化精度对比、延迟分布、内存占用分析
Features:
- 动态量化感知训练
- TensorRT引擎构建与优化
- Jetson/嵌入式平台模拟
- 精度-延迟-内存权衡分析
"""
import torch
import torch.nn as nn
import torch.quantization as quant
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple
import time
import json
class RobotPolicy(nn.Module):
"""模拟机器人策略网络"""
def __init__(self, obs_dim: int = 512, action_dim: int = 7, hidden_dim: int = 256):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)
class QuantizedPolicy(nn.Module):
"""量化感知训练包装器"""
def __init__(self, model: nn.Module):
super().__init__()
self.quant = quant.QuantStub()
self.dequant = quant.DeQuantStub()
self.model = model
self.model.eval()
# 融合层以提高量化精度
self._fuse_layers()
def _fuse_layers(self):
"""融合Conv-BN-ReLU等层(对于线性层类似处理)"""
# 在实际应用中使用 torch.quantization.fuse_modules
pass
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.quant(x)
x = self.model(x)
x = self.dequant(x)
return x
class TensorRTOptimizer:
"""TensorRT优化器(模拟实现)"""
def __init__(self, model: nn.Module, precision: str = "fp16"):
self.model = model
self.precision = precision
self.engine = None
self._build_engine()
def _build_engine(self):
"""构建TensorRT引擎(模拟)"""
# 实际实现使用 tensorrt.Builder 和 onnx 转换
self.engine = {
"precision": self.precision,
"layers": self._optimize_layers(),
"memory_mb": self._estimate_memory()
}
def _optimize_layers(self) -> List[Dict]:
"""层融合与内核优化"""
layers = []
# 模拟层融合
layers.append({"type": "fused_fc_relu", "ops": 2})
layers.append({"type": "fused_fc_relu", "ops": 2})
layers.append({"type": "fc", "ops": 1})
return layers
def _estimate_memory(self) -> float:
"""估算内存占用"""
param_count = sum(p.numel() for p in self.model.parameters())
if self.precision == "fp32":
return param_count * 4 / 1024 / 1024
elif self.precision == "fp16":
return param_count * 2 / 1024 / 1024
elif self.precision == "int8":
return param_count * 1 / 1024 / 1024
return 0.0
def inference(self, x: np.ndarray) -> np.ndarray:
"""模拟TensorRT推理"""
# 模拟优化后的延迟
base_time = 0.005 # 5ms基础延迟
if self.precision == "fp16":
base_time *= 0.6
elif self.precision == "int8":
base_time *= 0.4
time.sleep(base_time * np.random.uniform(0.9, 1.1))
return np.random.randn(x.shape[0], 7).astype(np.float32)
class EdgeDeploymentBenchmark:
"""边缘部署基准测试"""
def __init__(self):
self.models = {}
self.results = {}
def prepare_models(self):
"""准备不同精度的模型"""
model = RobotPolicy()
# FP32基准
self.models["fp32"] = model
# FP16(模拟)
self.models["fp16"] = model.half()
# INT8量化
quantized_model = QuantizedPolicy(model)
quantized_model.qconfig = quant.get_default_qconfig('fbgemm')
quant.prepare_qat(quantized_model, inplace=True)
self.models["int8"] = quantized_model
# TensorRT优化版本
self.models["tensorrt_fp16"] = TensorRTOptimizer(model, "fp16")
self.models["tensorrt_int8"] = TensorRTOptimizer(model, "int8")
def benchmark_latency(self, num_runs: int = 1000) -> Dict[str, List[float]]:
"""测试各模型延迟"""
latencies = {}
input_data = torch.randn(1, 512)
for name, model in self.models.items():
times = []
# 预热
for _ in range(10):
if isinstance(model, TensorRTOptimizer):
_ = model.inference(input_data.numpy())
else:
with torch.no_grad():
_ = model(input_data)
# 正式测试
for _ in range(num_runs):
start = time.time()
if isinstance(model, TensorRTOptimizer):
_ = model.inference(input_data.numpy())
else:
with torch.no_grad():
_ = model(input_data)
times.append((time.time() - start) * 1000) # ms
latencies[name] = times
return latencies
def benchmark_accuracy(self, test_data: torch.Tensor) -> Dict[str, float]:
"""测试量化对精度的影响"""
accuracies = {}
ground_truth = self.models["fp32"](test_data)
for name, model in self.models.items():
if isinstance(model, TensorRTOptimizer):
pred = torch.tensor(model.inference(test_data.numpy()))
else:
with torch.no_grad():
pred = model(test_data)
# 计算相对误差
error = torch.norm(pred - ground_truth) / torch.norm(ground_truth)
accuracies[name] = 1.0 - error.item()
return accuracies
def memory_analysis(self) -> Dict[str, float]:
"""内存占用分析"""
memory = {}
for name, model in self.models.items():
if isinstance(model, TensorRTOptimizer):
memory[name] = model.engine["memory_mb"]
else:
param_size = sum(p.numel() * p.element_size() for p in model.parameters())
memory[name] = param_size / 1024 / 1024
return memory
def visualize_benchmark(results: Dict, latencies: Dict, memory: Dict, accuracies: Dict):
"""可视化基准测试结果"""
fig = plt.figure(figsize=(16, 10))
# 1. 延迟分布小提琴图
ax1 = fig.add_subplot(221)
latency_data = [latencies[name] for name in latencies.keys()]
positions = range(len(latencies))
parts = ax1.violinplot(latency_data, positions, showmeans=True, showmedians=True)
for pc, color in zip(parts['bodies'], ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']):
pc.set_facecolor(color)
pc.set_alpha(0.7)
ax1.set_xticks(positions)
ax1.set_xticklabels(list(latencies.keys()), rotation=45, ha='right')
ax1.set_ylabel('Latency (ms)')
ax1.set_title('Inference Latency Distribution', fontsize=12, fontweight='bold')
ax1.grid(True, alpha=0.3, axis='y')
# 添加统计标注
for i, (name, data) in enumerate(latencies.items()):
median = np.median(data)
ax1.text(i, median, f'{median:.2f}ms', ha='center', va='bottom', fontsize=9)
# 2. 精度对比
ax2 = fig.add_subplot(222)
names = list(accuracies.keys())
acc_values = list(accuracies.values())
colors = ['#2E86AB' if 'fp32' in n else '#A23B72' if 'fp16' in n else '#F18F01' for n in names]
bars = ax2.bar(names, acc_values, color=colors, alpha=0.8, edgecolor='black', linewidth=1.5)
ax2.set_ylabel('Relative Accuracy')
ax2.set_title('Model Accuracy Comparison', fontsize=12, fontweight='bold')
ax2.set_ylim(0.8, 1.05)
ax2.tick_params(axis='x', rotation=45)
for bar, val in zip(bars, acc_values):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{val:.3f}', ha='center', va='bottom', fontsize=10)
# 3. 内存占用
ax3 = fig.add_subplot(223)
mem_names = list(memory.keys())
mem_values = list(memory.values())
bars = ax3.bar(mem_names, mem_values, color=['#3E92CC', '#AA6C39', '#2A9D8F', '#E76F51', '#264653'],
alpha=0.8, edgecolor='black', linewidth=1.5)
ax3.set_ylabel('Memory (MB)')
ax3.set_title('Memory Footprint', fontsize=12, fontweight='bold')
ax3.tick_params(axis='x', rotation=45)
for bar, val in zip(bars, mem_values):
height = bar.get_height()
ax3.text(bar.get_x() + bar.get_width()/2., height,
f'{val:.1f}MB', ha='center', va='bottom', fontsize=10)
# 4. 延迟-精度权衡散点图
ax4 = fig.add_subplot(224)
mean_latencies = [np.mean(latencies[name]) for name in names]
scatter = ax4.scatter(mean_latencies, acc_values, s=300, c=range(len(names)),
cmap='viridis', alpha=0.8, edgecolors='black', linewidth=2)
for i, name in enumerate(names):
ax4.annotate(name, (mean_latencies[i], acc_values[i]),
xytext=(5, 5), textcoords='offset points', fontsize=9)
ax4.set_xlabel('Mean Latency (ms)')
ax4.set_ylabel('Relative Accuracy')
ax4.set_title('Latency-Accuracy Trade-off', fontsize=12, fontweight='bold')
ax4.grid(True, alpha=0.3)
plt.suptitle('Edge Deployment Optimization for Diffusion Policy', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('edge_deployment_benchmark.png', dpi=150, bbox_inches='tight')
print("基准测试结果已保存至 edge_deployment_benchmark.png")
plt.show()
def main():
print("初始化边缘部署基准测试...")
benchmark = EdgeDeploymentBenchmark()
benchmark.prepare_models()
print("运行延迟基准测试...")
latencies = benchmark.benchmark_latency(num_runs=500)
print("测试量化精度...")
test_data = torch.randn(100, 512)
accuracies = benchmark.benchmark_accuracy(test_data)
print("分析内存占用...")
memory = benchmark.memory_analysis()
print("生成可视化报告...")
visualize_benchmark({}, latencies, memory, accuracies)
# 打印详细报告
print("\n" + "="*60)
print("边缘部署性能报告")
print("="*60)
for name in latencies.keys():
lat = latencies[name]
acc = accuracies.get(name, 0.0)
mem = memory.get(name, 0.0)
print(f"{name:20s}: {np.mean(lat):6.2f}ms ± {np.std(lat):4.2f}ms | "
f"Accuracy: {acc:.3f} | Memory: {mem:.1f}MB")
print("="*60)
if __name__ == "__main__":
main()
脚本4 扩散策略作为终端代价、实时约束处理、滚动时域优化
"""
Script 4: learning_based_mpc.py
Content: 扩散策略作为终端代价、实时约束处理、滚动时域优化
Usage:
1. 安装依赖: pip install torch numpy matplotlib cvxpy scipy
2. 运行: python learning_based_mpc.py
3. 输出: MPC优化过程可视化、约束违反分析、轨迹跟踪对比
Features:
- 扩散策略代价函数集成
- 凸优化约束处理
- 滚动时域控制实现
- 实时性能监控
"""
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle, FancyBboxPatch
from scipy.optimize import minimize
from typing import Optional, Tuple, List
import time
class LearnedValueFunction(nn.Module):
"""学习价值函数(作为MPC终端代价)"""
def __init__(self, state_dim: int = 14, action_dim: int = 7, hidden_dim: int = 256):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:
x = torch.cat([state, action], dim=-1)
return self.net(x)
class RobotDynamics:
"""机器人动力学模型(简化的刚体动力学)"""
def __init__(self, dt: float = 0.01):
self.dt = dt
self.mass = 1.0
self.damping = 0.1
def step(self, state: np.ndarray, action: np.ndarray) -> np.ndarray:
"""
状态: [x, y, z, vx, vy, vz, qx, qy, qz, qw, wx, wy, wz, gripper]
动作: [fx, fy, fz, tx, ty, tz, gripper_cmd]
"""
pos = state[:3]
vel = state[3:6]
quat = state[6:10]
omega = state[10:13]
gripper = state[13]
# 位置更新
force = action[:3]
acc = force / self.mass - self.damping * vel
new_vel = vel + acc * self.dt
new_pos = pos + vel * self.dt + 0.5 * acc * self.dt**2
# 姿态更新(简化)
new_omega = omega + action[3:6] * self.dt * 0.1 # 力矩控制
new_quat = quat # 简化处理
# gripper更新
new_gripper = gripper + (action[6] - gripper) * self.dt * 5.0
new_gripper = np.clip(new_gripper, 0, 1)
return np.concatenate([new_pos, new_vel, new_quat, new_omega, [new_gripper]])
class LearningBasedMPC:
"""基于学习的MPC控制器"""
def __init__(self, horizon: int = 20, dt: float = 0.01):
self.horizon = horizon
self.dt = dt
self.dynamics = RobotDynamics(dt)
self.value_net = LearnedValueFunction()
self.value_net.eval()
# 约束条件
self.pos_limits = np.array([[-1, -1, 0], [1, 1, 1.5]]) # 工作空间限制
self.vel_limit = 0.5 # 速度限制
self.force_limit = 10.0 # 力限制
# 权重
self.Q = np.eye(14) * 0.1 # 状态代价
self.R = np.eye(7) * 0.01 # 控制代价
def learned_cost(self, state: np.ndarray, action: np.ndarray) -> float:
"""学习代价函数"""
with torch.no_grad():
state_t = torch.FloatTensor(state).unsqueeze(0)
action_t = torch.FloatTensor(action).unsqueeze(0)
cost = self.value_net(state_t, action_t).item()
return cost
def total_cost(self, action_seq: np.ndarray, current_state: np.ndarray, target_state: np.ndarray) -> float:
"""总代价函数"""
state = current_state.copy()
total_cost = 0.0
for i in range(self.horizon):
action = action_seq[i*7:(i+1)*7]
# 跟踪代价
state_cost = np.dot(state - target_state, self.Q @ (state - target_state))
control_cost = np.dot(action, self.R @ action)
learned_cost = self.learned_cost(state, action) * 0.5
total_cost += state_cost + control_cost + learned_cost
# 状态转移
state = self.dynamics.step(state, action)
# 终端代价(强调学习价值函数)
terminal_cost = self.learned_cost(state, np.zeros(7)) * 2.0
total_cost += terminal_cost
return total_cost
def constraints(self, action_seq: np.ndarray, current_state: np.ndarray) -> dict:
"""生成约束违反字典"""
state = current_state.copy()
violations = {
'pos_limit': 0,
'vel_limit': 0,
'force_limit': 0
}
for i in range(self.horizon):
action = action_seq[i*7:(i+1)*7]
# 检查力限制
if np.linalg.norm(action[:3]) > self.force_limit:
violations['force_limit'] += 1
state = self.dynamics.step(state, action)
# 检查位置限制
if np.any(state[:3] < self.pos_limits[0]) or np.any(state[:3] > self.pos_limits[1]):
violations['pos_limit'] += 1
# 检查速度限制
if np.linalg.norm(state[3:6]) > self.vel_limit:
violations['vel_limit'] += 1
return violations
def optimize(self, current_state: np.ndarray, target_state: np.ndarray,
warm_start: Optional[np.ndarray] = None) -> Tuple[np.ndarray, float]:
"""
求解MPC优化问题
返回: (最优动作序列, 优化时间)
"""
start_time = time.time()
# 初始猜测
if warm_start is not None:
x0 = warm_start
else:
x0 = np.zeros(self.horizon * 7)
# 动作边界
bounds = []
for _ in range(self.horizon):
bounds.extend([(-self.force_limit, self.force_limit)] * 3) # 力
bounds.extend([(-5, 5)] * 3) # 力矩
bounds.append((0, 1)) # gripper
# 优化
result = minimize(
lambda x: self.total_cost(x, current_state, target_state),
x0,
method='SLSQP',
bounds=bounds,
options={'maxiter': 50, 'ftol': 1e-6}
)
opt_time = time.time() - start_time
return result.x, opt_time
def control(self, current_state: np.ndarray, target_state: np.ndarray) -> np.ndarray:
"""执行一步控制"""
action_seq, _ = self.optimize(current_state, target_state)
return action_seq[:7] # 返回第一个动作
def simulate_mpc_control(mpc: LearningBasedMPC, initial_state: np.ndarray,
target_state: np.ndarray, sim_steps: int = 200):
"""模拟MPC控制过程"""
states = [initial_state.copy()]
actions = []
optimize_times = []
constraint_violations = []
current_state = initial_state.copy()
warm_start = None
for step in range(sim_steps):
# 每N步重新优化(滚动时域)
if step % 5 == 0:
action_seq, opt_time = mpc.optimize(current_state, target_state, warm_start)
warm_start = np.roll(action_seq, -7) # 移位作为热启动
warm_start[-7:] = 0
action = action_seq[(step % 5) * 7:((step % 5) + 1) * 7]
# 模拟执行
current_state = mpc.dynamics.step(current_state, action)
states.append(current_state.copy())
actions.append(action)
optimize_times.append(opt_time if step % 5 == 0 else 0)
# 检查约束
violations = mpc.constraints(action_seq, states[-2])
constraint_violations.append(sum(violations.values()))
return np.array(states), np.array(actions), np.array(optimize_times), np.array(constraint_violations)
def visualize_mpc_results(states: np.ndarray, actions: np.ndarray,
opt_times: np.ndarray, violations: np.ndarray,
target_state: np.ndarray):
"""可视化MPC控制结果"""
fig = plt.figure(figsize=(16, 12))
# 1. 3D轨迹
ax1 = fig.add_subplot(221, projection='3d')
ax1.plot(states[:, 0], states[:, 1], states[:, 2], 'b-', linewidth=2, label='Executed Trajectory')
ax1.scatter(*target_state[:3], c='red', s=200, marker='*', label='Target', edgecolors='black', linewidth=2)
ax1.scatter(*states[0, :3], c='green', s=100, marker='o', label='Start')
# 绘制工作空间限制
limits = mpc.pos_limits
x, y = np.meshgrid([limits[0,0], limits[1,0]], [limits[0,1], limits[1,1]])
z_bottom = np.full_like(x, limits[0,2])
z_top = np.full_like(x, limits[1,2])
ax1.plot_surface(x, y, z_bottom, alpha=0.1, color='gray')
ax1.plot_surface(x, y, z_top, alpha=0.1, color='gray')
ax1.set_xlabel('X (m)')
ax1.set_ylabel('Y (m)')
ax1.set_zlabel('Z (m)')
ax1.set_title('MPC Trajectory Tracking with Learned Cost', fontsize=12, fontweight='bold')
ax1.legend()
# 2. 动作时序
ax2 = fig.add_subplot(222)
time_steps = np.arange(len(actions)) * 0.01
ax2.plot(time_steps, actions[:, 0], label='Fx', linewidth=2)
ax2.plot(time_steps, actions[:, 1], label='Fy', linewidth=2)
ax2.plot(time_steps, actions[:, 2], label='Fz', linewidth=2)
ax2.axhline(y=mpc.force_limit, color='r', linestyle='--', alpha=0.5, label='Limit')
ax2.axhline(y=-mpc.force_limit, color='r', linestyle='--', alpha=0.5)
ax2.set_xlabel('Time (s)')
ax2.set_ylabel('Force (N)')
ax2.set_title('Control Forces', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. 优化时间
ax3 = fig.add_subplot(223)
mask = opt_times > 0
opt_steps = np.where(mask)[0]
opt_vals = opt_times[mask] * 1000 # 转换为ms
ax3.bar(opt_steps, opt_vals, color='#2E86AB', alpha=0.8, width=4)
ax3.axhline(y=np.mean(opt_vals), color='red', linestyle='--',
label=f'Mean: {np.mean(opt_vals):.1f}ms')
ax3.set_xlabel('Time Step')
ax3.set_ylabel('Optimization Time (ms)')
ax3.set_title('MPC Optimization Latency', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3, axis='y')
# 4. 约束违反
ax4 = fig.add_subplot(224)
ax4.fill_between(range(len(violations)), violations, alpha=0.5, color='#F18F01')
ax4.plot(violations, color='#C73E1D', linewidth=2)
ax4.set_xlabel('Time Step')
ax4.set_ylabel('Constraint Violations')
ax4.set_title('Constraint Violation History', fontsize=12, fontweight='bold')
ax4.grid(True, alpha=0.3)
plt.suptitle('Learning-based MPC for Robot Control', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('learning_based_mpc.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 learning_based_mpc.png")
plt.show()
def main():
print("初始化基于学习的MPC控制器...")
global mpc
mpc = LearningBasedMPC(horizon=20, dt=0.01)
# 初始状态和目标状态
initial_state = np.array([0.5, -0.5, 0.2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])
target_state = np.array([0.8, 0.2, 0.6, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0.5])
print("运行MPC控制模拟...")
states, actions, opt_times, violations = simulate_mpc_control(
mpc, initial_state, target_state, sim_steps=200
)
print("生成可视化...")
visualize_mpc_results(states, actions, opt_times, violations, target_state)
print(f"平均优化时间: {np.mean(opt_times[opt_times > 0])*1000:.2f} ms")
print(f"总约束违反次数: {np.sum(violations)}")
if __name__ == "__main__":
main()
脚本5 残差策略学习、物理约束硬编码、安全滤波器设计
"""
Script 5: residual_learning_safety.py
Content: 残差策略学习、物理约束硬编码、安全滤波器设计
Usage:
1. 安装依赖: pip install torch numpy matplotlib scipy
2. 运行: python residual_learning_safety.py
3. 输出: 残差修正可视化、安全边界分析、约束满足率统计
Features:
- 残差策略网络实现
- 安全滤波器(投影层)
- 物理一致性约束
- 安全区域可视化
"""
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle, Polygon
from matplotlib.collections import PatchCollection
import matplotlib.patches as mpatches
from scipy.optimize import minimize
from typing import Tuple, Optional
class NominalMPC:
"""标称MPC(简化模型)"""
def __init__(self):
self.dt = 0.01
def compute_control(self, state: np.ndarray, target: np.ndarray) -> np.ndarray:
"""基于简化动力学计算控制"""
# PD控制器作为标称控制器
kp, kd = 10.0, 2.0
pos_error = target[:3] - state[:3]
vel_error = -state[3:6]
force = kp * pos_error + kd * vel_error
force = np.clip(force, -10, 10)
# 姿态控制(简化)
torque = np.zeros(3)
gripper = target[-1] if len(target) > 7 else 0.0
return np.concatenate([force, torque, [gripper]])
class ResidualPolicy(nn.Module):
"""残差策略网络"""
def __init__(self, state_dim: int = 14, action_dim: int = 7):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim * 2, 256), # 当前状态和目标状态
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim),
nn.Tanh() # 输出范围[-1, 1]
)
self.scale = 2.0 # 残差缩放因子
def forward(self, state: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
x = torch.cat([state, target], dim=-1)
return self.net(x) * self.scale
class SafetyFilter:
"""安全滤波器(凸优化层)"""
def __init__(self):
self.pos_limits = np.array([[-1.0, -1.0, 0.0], [1.0, 1.0, 1.5]])
self.max_vel = 1.0
self.max_force = 15.0
def check_safety(self, state: np.ndarray, action: np.ndarray) -> Tuple[bool, str]:
"""检查状态-动作对的安全性"""
next_state = state.copy()
next_state[:3] += state[3:6] * 0.01 + 0.5 * action[:3] * 0.01**2
next_state[3:6] += action[:3] * 0.01
# 检查位置限制
if np.any(next_state[:3] < self.pos_limits[0]) or np.any(next_state[:3] > self.pos_limits[1]):
return False, "Position limit violation"
# 检查速度限制
if np.linalg.norm(next_state[3:6]) > self.max_vel:
return False, "Velocity limit violation"
# 检查力限制
if np.linalg.norm(action[:3]) > self.max_force:
return False, "Force limit violation"
return True, "Safe"
def project_action(self, state: np.ndarray, proposed_action: np.ndarray) -> np.ndarray:
"""将提议动作投影到安全区域"""
def objective(a):
return np.sum((a - proposed_action)**2)
def constraint_pos(a):
next_pos = state[:3] + state[3:6] * 0.01 + 0.5 * a[:3] * 0.01**2
return np.min([
np.min(next_pos - self.pos_limits[0]),
np.min(self.pos_limits[1] - next_pos)
])
def constraint_vel(a):
next_vel = state[3:6] + a[:3] * 0.01
return self.max_vel - np.linalg.norm(next_vel)
def constraint_force(a):
return self.max_force - np.linalg.norm(a[:3])
constraints = [
{'type': 'ineq', 'fun': constraint_pos},
{'type': 'ineq', 'fun': constraint_vel},
{'type': 'ineq', 'fun': constraint_force}
]
result = minimize(objective, proposed_action, method='SLSQP',
constraints=constraints, options={'maxiter': 10})
return result.x if result.success else proposed_action * 0.5
class ResidualLearningController:
"""残差学习控制器"""
def __init__(self, use_residual: bool = True, use_safety: bool = True):
self.nominal_mpc = NominalMPC()
self.residual_policy = ResidualPolicy()
self.safety_filter = SafetyFilter()
self.use_residual = use_residual
self.use_safety = use_safety
# 统计数据
self.safety_interventions = 0
self.total_steps = 0
def control(self, state: np.ndarray, target: np.ndarray) -> Tuple[np.ndarray, dict]:
"""计算控制指令"""
# 标称控制
nominal_action = self.nominal_mpc.compute_control(state, target)
# 残差修正
if self.use_residual:
with torch.no_grad():
state_t = torch.FloatTensor(state).unsqueeze(0)
target_t = torch.FloatTensor(target).unsqueeze(0)
residual = self.residual_policy(state_t, target_t).numpy()[0]
proposed_action = nominal_action + residual
else:
proposed_action = nominal_action
residual = np.zeros_like(nominal_action)
# 安全检查与投影
is_safe, reason = self.safety_filter.check_safety(state, proposed_action)
if not is_safe and self.use_safety:
safe_action = self.safety_filter.project_action(state, proposed_action)
self.safety_interventions += 1
else:
safe_action = proposed_action
self.total_steps += 1
info = {
'nominal': nominal_action,
'residual': residual,
'proposed': proposed_action,
'safe': safe_action,
'intervention': not is_safe,
'reason': reason
}
return safe_action, info
def get_safety_rate(self) -> float:
"""获取安全干预率"""
if self.total_steps == 0:
return 0.0
return self.safety_interventions / self.total_steps
def simulate_scenarios():
"""模拟不同场景下的控制性能"""
scenarios = [
{"name": "Nominal Only", "use_residual": False, "use_safety": False},
{"name": "Residual Only", "use_residual": True, "use_safety": False},
{"name": "Safety Only", "use_residual": False, "use_safety": True},
{"name": "Residual + Safety", "use_residual": True, "use_safety": True}
]
results = []
for scenario in scenarios:
print(f"运行场景: {scenario['name']}")
controller = ResidualLearningController(
use_residual=scenario['use_residual'],
use_safety=scenario['use_safety']
)
# 模拟轨迹跟踪
state = np.array([0.0, 0.0, 0.5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])
target = np.array([0.8, 0.5, 0.3, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0.5])
states = [state.copy()]
actions = []
interventions = []
for step in range(300):
action, info = controller.control(state, target)
state = state + np.concatenate([action[:3]*0.01, action[:3]*0.1,
np.zeros(7), [action[-1]*0.01]])
states.append(state.copy())
actions.append(action)
interventions.append(info['intervention'])
results.append({
'name': scenario['name'],
'states': np.array(states),
'actions': np.array(actions),
'interventions': np.array(interventions),
'safety_rate': controller.get_safety_rate()
})
return results
def visualize_residual_control(results: list):
"""可视化残差控制结果"""
fig = plt.figure(figsize=(18, 10))
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
# 1. 轨迹对比
ax1 = fig.add_subplot(231, projection='3d')
for i, result in enumerate(results):
states = result['states']
ax1.plot(states[:, 0], states[:, 1], states[:, 2],
color=colors[i], linewidth=2, label=result['name'], alpha=0.8)
ax1.set_xlabel('X (m)')
ax1.set_ylabel('Y (m)')
ax1.set_zlabel('Z (m)')
ax1.set_title('Trajectory Comparison', fontsize=12, fontweight='bold')
ax1.legend()
# 2. 安全干预率
ax2 = fig.add_subplot(232)
names = [r['name'] for r in results]
rates = [r['safety_rate'] * 100 for r in results]
bars = ax2.bar(names, rates, color=colors, alpha=0.8, edgecolor='black', linewidth=2)
ax2.set_ylabel('Safety Intervention Rate (%)')
ax2.set_title('Safety Filter Activity', fontsize=12, fontweight='bold')
ax2.tick_params(axis='x', rotation=15)
for bar, rate in zip(bars, rates):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{rate:.1f}%', ha='center', va='bottom', fontsize=10)
# 3. 残差修正幅度(仅Residual+Safety场景)
ax3 = fig.add_subplot(233)
residual_result = results[3]
actions = residual_result['actions']
time_steps = np.arange(len(actions)) * 0.01
# 模拟标称动作(实际应从控制器记录)
nominal_actions = np.zeros_like(actions)
residuals = actions - nominal_actions
ax3.plot(time_steps, np.linalg.norm(residuals[:, :3], axis=1),
color='#A23B72', linewidth=2, label='Residual Magnitude')
ax3.fill_between(time_steps, 0, np.linalg.norm(residuals[:, :3], axis=1),
alpha=0.3, color='#A23B72')
ax3.set_xlabel('Time (s)')
ax3.set_ylabel('Residual Norm')
ax3.set_title('Residual Correction Magnitude', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 安全区域可视化(2D投影)
ax4 = fig.add_subplot(234)
# 绘制工作空间边界
limits = np.array([[-1, -1], [1, 1]])
rect = Rectangle((limits[0,0], limits[0,1]),
limits[1,0]-limits[0,0],
limits[1,1]-limits[0,1],
fill=False, edgecolor='red', linewidth=3, linestyle='--', label='Safety Boundary')
ax4.add_patch(rect)
# 绘制轨迹
for i, result in enumerate(results):
states = result['states']
ax4.plot(states[:, 0], states[:, 1], color=colors[i], linewidth=2, alpha=0.7)
ax4.scatter(states[0, 0], states[0, 1], color=colors[i], s=100, marker='o', zorder=5)
ax4.scatter(states[-1, 0], states[-1, 1], color=colors[i], s=100, marker='x', zorder=5)
ax4.set_xlabel('X (m)')
ax4.set_ylabel('Y (m)')
ax4.set_title('2D Position Constraints', fontsize=12, fontweight='bold')
ax4.legend()
ax4.grid(True, alpha=0.3)
ax4.set_xlim(-1.2, 1.2)
ax4.set_ylim(-1.2, 1.2)
# 5. 动作约束满足情况
ax5 = fig.add_subplot(235)
force_limits = []
for result in results:
actions = result['actions']
force_norms = np.linalg.norm(actions[:, :3], axis=1)
violations = np.sum(force_norms > 15.0)
force_limits.append(violations)
bars = ax5.bar(names, force_limits, color=colors, alpha=0.8, edgecolor='black', linewidth=2)
ax5.set_ylabel('Constraint Violations')
ax5.set_title('Force Limit Violations', fontsize=12, fontweight='bold')
ax5.tick_params(axis='x', rotation=15)
# 6. 安全干预时序
ax6 = fig.add_subplot(236)
interventions = results[3]['interventions'] # Residual + Safety
time_steps = np.arange(len(interventions)) * 0.01
ax6.fill_between(time_steps, 0, interventions, alpha=0.5, color='#C73E1D', label='Intervention')
ax6.plot(time_steps, interventions, color='#C73E1D', linewidth=2)
ax6.set_xlabel('Time (s)')
ax6.set_ylabel('Intervention Flag')
ax6.set_title('Safety Filter Interventions Over Time', fontsize=12, fontweight='bold')
ax6.set_ylim(-0.1, 1.1)
ax6.legend()
plt.suptitle('Residual Learning with Safety Constraints', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('residual_learning_safety.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 residual_learning_safety.png")
plt.show()
def main():
print("初始化残差学习控制器...")
print("运行多场景对比模拟...")
results = simulate_scenarios()
print("生成可视化...")
visualize_residual_control(results)
print("\n性能总结:")
for result in results:
print(f"{result['name']:20s}: 安全干预率 = {result['safety_rate']*100:.2f}%")
if __name__ == "__main__":
main()
脚本6 分层控制架构、动作缓冲与插值、延迟补偿
"""
Script 6: hierarchical_control_architecture.py
Content: 分层控制架构、动作缓冲与插值、延迟补偿
Usage:
1. 安装依赖: pip install torch numpy matplotlib scipy
2. 运行: python hierarchical_control_architecture.py
3. 输出: 分层控制时序图、延迟补偿效果、动作平滑性分析
Features:
- 高层策略(低频)与低层MPC(高频)协同
- 动作缓冲区管理
- 状态预测器(延迟补偿)
- 分层架构性能分析
"""
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
from scipy.interpolate import interp1d
from typing import Optional, List, Tuple
import time
class HighLevelPolicy(nn.Module):
"""高层策略网络(低频运行)"""
def __init__(self, obs_dim: int = 512, goal_dim: int = 3):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim + goal_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 7) # 目标位姿
)
self.update_rate = 10 # Hz
def forward(self, obs: torch.Tensor, goal: torch.Tensor) -> torch.Tensor:
x = torch.cat([obs, goal], dim=-1)
return self.net(x)
class LowLevelMPC:
"""低层MPC控制器(高频运行)"""
def __init__(self, dt: float = 0.001):
self.dt = dt
self.control_rate = 200 # Hz
self.kp = np.array([100, 100, 100, 50, 50, 50])
self.kd = np.array([20, 20, 20, 10, 10, 10])
def track(self, current: np.ndarray, target: np.ndarray) -> np.ndarray:
"""PD跟踪控制"""
error = target - current
# 简化的PD控制
action = self.kp * error[:6] * 0.01 # 缩放以适应动作范围
return np.clip(action, -1, 1)
class StatePredictor:
"""状态预测器(用于延迟补偿)"""
def __init__(self, delay_steps: int = 5):
self.delay_steps = delay_steps
self.history = deque(maxlen=50)
self.velocity_estimate = np.zeros(14)
def update(self, state: np.ndarray):
"""更新历史状态"""
self.history.append(state.copy())
if len(self.history) >= 2:
# 估计速度
recent = np.array(list(self.history)[-5:])
if len(recent) > 1:
self.velocity_estimate = (recent[-1] - recent[0]) / (0.01 * len(recent))
def predict(self, current_state: np.ndarray, delay: float) -> np.ndarray:
"""预测未来状态"""
# 简单的线性预测
predicted = current_state + self.velocity_estimate * delay
return predicted
class ActionBuffer:
"""动作缓冲区(管理高层指令)"""
def __init__(self, size: int = 100):
self.buffer = deque(maxlen=size)
self.interpolation_mode = 'cubic'
def push(self, action: np.ndarray):
"""添加新动作"""
self.buffer.append(action.copy())
def get_interpolated(self, indices: np.ndarray) -> np.ndarray:
"""获取插值后的动作"""
if len(self.buffer) < 2:
return np.zeros(7)
buffer_array = np.array(self.buffer)
x = np.linspace(0, 1, len(self.buffer))
x_new = np.clip(indices, 0, 1)
interpolated = np.zeros((len(indices), buffer_array.shape[1]))
for i in range(buffer_array.shape[1]):
f = interp1d(x, buffer_array[:, i], kind=self.interpolation_mode,
fill_value='extrapolate')
interpolated[:, i] = f(x_new)
return interpolated
class HierarchicalController:
"""分层控制器"""
def __init__(self, delay_ms: float = 20.0):
self.high_level = HighLevelPolicy()
self.low_level = LowLevelMPC()
self.predictor = StatePredictor(delay_steps=int(delay_ms / 5))
self.buffer = ActionBuffer()
self.delay = delay_ms / 1000.0 # 转换为秒
self.high_level_timer = 0.0
self.last_high_level_action = np.zeros(7)
# 性能监控
self.high_level_times = []
self.low_level_times = []
self.latency_compensation_errors = []
def step(self, obs: torch.Tensor, goal: torch.Tensor,
current_state: np.ndarray, time: float) -> Tuple[np.ndarray, dict]:
"""
执行控制步
返回: (控制指令, 调试信息)
"""
info = {}
# 延迟补偿:预测未来状态
start_time = time_module.time()
predicted_state = self.predictor.predict(current_state, self.delay)
self.predictor.update(current_state)
# 高层策略(低频)
if time - self.high_level_timer >= 1.0 / self.high_level.update_rate:
with torch.no_grad():
high_action = self.high_level(obs, goal).numpy()[0]
self.last_high_level_action = high_action
self.buffer.push(high_action)
self.high_level_timer = time
info['high_level_triggered'] = True
self.high_level_times.append(time_module.time() - start_time)
else:
info['high_level_triggered'] = False
# 低层控制(高频)
low_start = time_module.time()
# 从缓冲区获取平滑的目标
if len(self.buffer.buffer) > 0:
target = self.buffer.get_interpolated(np.array([0.5]))[0] # 中间点
else:
target = self.last_high_level_action
# 混合:高层目标 + 低层修正
low_action = self.low_level.track(predicted_state, target)
# 动作混合(平滑过渡)
alpha = 0.7 # 高层权重
final_action = alpha * self.last_high_level_action + (1 - alpha) * low_action
self.low_level_times.append(time_module.time() - low_start)
# 计算补偿误差(模拟)
actual_future = current_state + np.random.randn(14) * 0.01 # 模拟真实未来
error = np.linalg.norm(predicted_state - actual_future)
self.latency_compensation_errors.append(error)
info['predicted_state'] = predicted_state
info['compensation_error'] = error
return final_action, info
import time as time_module
def simulate_hierarchical_control():
"""模拟分层控制"""
controller = HierarchicalController(delay_ms=20.0)
# 模拟参数
sim_duration = 5.0 # 秒
dt = 0.005 # 仿真步长
steps = int(sim_duration / dt)
# 初始状态和目标
state = np.zeros(14)
state[:3] = [0.2, 0.1, 0.3]
goal = torch.tensor([[0.8, 0.5, 0.2]])
# 记录数据
states = [state.copy()]
actions = []
high_level_flags = []
compensation_errors = []
buffer_sizes = []
for step in range(steps):
t = step * dt
# 模拟观测(带噪声)
obs = torch.randn(1, 512) * 0.1
obs[0, :3] = torch.tensor(state[:3])
# 控制步
action, info = controller.step(obs, goal, state, t)
# 模拟状态更新(简化动力学)
state = state + np.concatenate([action[:3]*dt*0.5, action[:3]*dt,
np.zeros(7), [action[-1]*dt]])
states.append(state.copy())
actions.append(action)
high_level_flags.append(info['high_level_triggered'])
compensation_errors.append(info['compensation_error'])
buffer_sizes.append(len(controller.buffer.buffer))
return (np.array(states), np.array(actions), np.array(high_level_flags),
np.array(compensation_errors), np.array(buffer_sizes),
controller)
def visualize_hierarchical_architecture(states, actions, high_flags, comp_errors,
buffer_sizes, controller):
"""可视化分层架构性能"""
fig = plt.figure(figsize=(18, 12))
time_axis = np.arange(len(states)) * 0.005
# 1. 控制架构示意图
ax1 = fig.add_subplot(231)
ax1.axis('off')
ax1.set_xlim(0, 10)
ax1.set_ylim(0, 10)
# 绘制框图
boxes = [
(1, 7, 'High-Level Policy\n(10 Hz)', '#2E86AB'),
(1, 4, 'Action Buffer\n& Interpolation', '#A23B72'),
(1, 1, 'Low-Level MPC\n(200 Hz)', '#F18F01'),
(6, 4, 'State Predictor\n(Delay Comp)', '#C73E1D'),
(6, 1, 'Robot Plant', '#3B1F2B')
]
for x, y, text, color in boxes:
rect = plt.Rectangle((x, y), 3, 2, facecolor=color, alpha=0.7, edgecolor='black', linewidth=2)
ax1.add_patch(rect)
ax1.text(x+1.5, y+1, text, ha='center', va='center', fontsize=10, fontweight='bold', color='white')
# 绘制箭头
arrows = [(2.5, 7, 2.5, 6), (2.5, 4, 2.5, 3), (2.5, 1, 2.5, 0.5),
(4, 1.5, 6, 4), (7.5, 4, 7.5, 3), (7.5, 1, 7.5, 0.5)]
for x1, y1, x2, y2 in arrows:
ax1.annotate('', xy=(x2, y2), xytext=(x1, y1),
arrowprops=dict(arrowstyle='->', lw=2, color='black'))
ax1.set_title('Hierarchical Control Architecture', fontsize=12, fontweight='bold')
# 2. 状态轨迹(显示高层更新点)
ax2 = fig.add_subplot(232, projection='3d')
ax2.plot(states[:, 0], states[:, 1], states[:, 2], 'b-', linewidth=1, alpha=0.6, label='Trajectory')
# 标记高层触发点
high_indices = np.where(high_flags)[0]
if len(high_indices) > 0:
ax2.scatter(states[high_indices, 0], states[high_indices, 1], states[high_indices, 2],
c='red', s=50, marker='^', label='High-Level Update', zorder=5)
ax2.set_xlabel('X (m)')
ax2.set_ylabel('Y (m)')
ax2.set_zlabel('Z (m)')
ax2.set_title('3D Trajectory with High-Level Updates', fontsize=12, fontweight='bold')
ax2.legend()
# 3. 动作时序(分层对比)
ax3 = fig.add_subplot(233)
ax3.plot(time_axis[:len(actions)], actions[:, 0], label='X Action', linewidth=1.5)
ax3.scatter(time_axis[high_indices][:len(high_indices)],
actions[high_indices[:len(actions[high_indices])], 0],
c='red', s=30, zorder=5, label='High-Level Command')
ax3.set_xlabel('Time (s)')
ax3.set_ylabel('Action Value')
ax3.set_title('Hierarchical Action Commands', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 缓冲区大小变化
ax4 = fig.add_subplot(234)
ax4.plot(time_axis[:len(buffer_sizes)], buffer_sizes, color='#A23B72', linewidth=2)
ax4.fill_between(time_axis[:len(buffer_sizes)], 0, buffer_sizes, alpha=0.3, color='#A23B72')
ax4.set_xlabel('Time (s)')
ax4.set_ylabel('Buffer Size')
ax4.set_title('Action Buffer Occupancy', fontsize=12, fontweight='bold')
ax4.grid(True, alpha=0.3)
# 5. 延迟补偿误差
ax5 = fig.add_subplot(235)
ax5.plot(time_axis[:len(comp_errors)], comp_errors, color='#C73E1D', linewidth=1.5)
ax5.axhline(y=np.mean(comp_errors), color='black', linestyle='--',
label=f'Mean: {np.mean(comp_errors):.4f}')
ax5.set_xlabel('Time (s)')
ax5.set_ylabel('Prediction Error')
ax5.set_title('Latency Compensation Error', fontsize=12, fontweight='bold')
ax5.legend()
ax5.grid(True, alpha=0.3)
# 6. 控制频率分析
ax6 = fig.add_subplot(236)
# 计算实际频率
high_times = np.array(controller.high_level_times) * 1000 # ms
low_times = np.array(controller.low_level_times) * 1000
data = [high_times[high_times < 10], low_times[low_times < 2]] # 过滤异常值
labels = ['High-Level\n(10Hz)', 'Low-Level\n(200Hz)']
bp = ax6.boxplot(data, labels=labels, patch_artist=True)
colors = ['#2E86AB', '#F18F01']
for patch, color in zip(bp['boxes'], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ax6.set_ylabel('Execution Time (ms)')
ax6.set_title('Control Loop Timing Analysis', fontsize=12, fontweight='bold')
ax6.grid(True, alpha=0.3, axis='y')
plt.suptitle('Hierarchical Control Architecture Performance', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('hierarchical_control.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 hierarchical_control.png")
plt.show()
def main():
print("初始化分层控制器...")
print("运行模拟...")
states, actions, high_flags, comp_errors, buffer_sizes, controller = simulate_hierarchical_control()
print("生成可视化...")
visualize_hierarchical_architecture(states, actions, high_flags, comp_errors,
buffer_sizes, controller)
print(f"\n性能统计:")
print(f"高层平均执行时间: {np.mean(controller.high_level_times)*1000:.2f} ms")
print(f"低层平均执行时间: {np.mean(controller.low_level_times)*1000:.2f} ms")
print(f"延迟补偿平均误差: {np.mean(comp_errors):.4f}")
if __name__ == "__main__":
main()
脚本7 GelSight图像处理、接触几何重建、滑移检测、力分布估计
"""
Script 7: high_frequency_tactile_sensing.py
Content: GelSight图像处理、接触几何重建、滑移检测、力分布估计
Usage:
1. 安装依赖: pip install torch numpy matplotlib opencv-python scipy scikit-image
2. 运行: python high_frequency_tactile_sensing.py
3. 输出: 触觉图像处理流水线、接触几何可视化、滑移检测时序
Features:
- 高频率触觉图像处理(>100Hz)
- 光度立体视觉重建
- 实时滑移检测算法
- 力分布估计网络
"""
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, FancyArrowPatch
from matplotlib.collections import LineCollection
from scipy.ndimage import gaussian_filter, sobel
from scipy.signal import medfilt
from typing import Tuple, Optional, List
import cv2
from dataclasses import dataclass
@dataclass
class TactileConfig:
image_size: int = 240
fps: int = 120
gel_depth: float = 0.005 # 凝胶厚度 (m)
pix_per_mm: float = 20.0
contact_threshold: float = 0.02
class TactileImageProcessor:
"""触觉图像处理器"""
def __init__(self, config: TactileConfig):
self.config = config
self.bg_image = None
self.ref_image = None
def set_background(self, bg_image: np.ndarray):
"""设置背景图像(无接触时)"""
self.bg_image = cv2.GaussianBlur(bg_image, (5, 5), 0)
def subtract_background(self, image: np.ndarray) -> np.ndarray:
"""背景减除"""
if self.bg_image is None:
return image
diff = cv2.absdiff(image, self.bg_image)
return diff
def enhance_contrast(self, image: np.ndarray) -> np.ndarray:
"""对比度增强"""
lab = cv2.cvtColor(image, cv2.COLOR_RGB2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
l = clahe.apply(l)
enhanced = cv2.merge([l, a, b])
return cv2.cvtColor(enhanced, cv2.COLOR_LAB2RGB)
def detect_contact_region(self, image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""检测接触区域"""
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# 自适应阈值
thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
# 形态学操作
kernel = np.ones((5, 5), np.uint8)
mask = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
# 查找轮廓
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contact_mask = np.zeros_like(gray)
if contours:
largest_contour = max(contours, key=cv2.contourArea)
cv2.drawContours(contact_mask, [largest_contour], -1, 255, -1)
return contact_mask, largest_contour if contours else None
class GeometryReconstructor:
"""接触几何重建器(基于光度立体视觉)"""
def __init__(self, config: TactileConfig):
self.config = config
def estimate_gradient(self, image: np.ndarray, mask: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""估计表面梯度"""
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY).astype(float)
# Sobel梯度
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
# 仅在接触区域内应用
sobelx *= (mask / 255)
sobely *= (mask / 255)
# 归一化
sobelx = np.tanh(sobelx / 50) * self.config.gel_depth
sobely = np.tanh(sobely / 50) * self.config.gel_depth
return sobelx, sobely
def integrate_gradient(self, gx: np.ndarray, gy: np.ndarray) -> np.ndarray:
"""积分梯度重建深度图(Frankot-Chellappa算法)"""
# 简化实现:使用泊松重建
from scipy.fftpack import fft2, ifft2, fftfreq
rows, cols = gx.shape
wx = 2 * np.pi * fftfreq(cols).reshape(1, -1)
wy = 2 * np.pi * fftfreq(rows).reshape(-1, 1)
# 傅里叶变换
Gx = fft2(gx)
Gy = fft2(gy)
# 频域积分
denom = wx**2 + wy**2
denom[0, 0] = 1 # 避免除零
Z = -1j * (wx * Gx + wy * Gy) / denom
# 逆变换
depth = np.real(ifft2(Z))
return depth
def compute_surface_normals(self, depth: np.ndarray) -> np.ndarray:
"""计算表面法向"""
gx = cv2.Sobel(depth, cv2.CV_64F, 1, 0, ksize=3)
gy = cv2.Sobel(depth, cv2.CV_64F, 0, 1, ksize=3)
normals = np.dstack([-gx, -gy, np.ones_like(depth)])
norm = np.linalg.norm(normals, axis=2, keepdims=True)
normals = normals / (norm + 1e-8)
return normals
class SlipDetector:
"""滑移检测器"""
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.flow_history = []
self.contact_centroid_history = []
def compute_optical_flow(self, prev_img: np.ndarray, curr_img: np.ndarray) -> np.ndarray:
"""计算光流"""
prev_gray = cv2.cvtColor(prev_img, cv2.COLOR_RGB2GRAY)
curr_gray = cv2.cvtColor(curr_img, cv2.COLOR_RGB2GRAY)
flow = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, None, None)[0]
return flow
def detect_slip(self, flow: np.ndarray, contact_mask: np.ndarray) -> Tuple[bool, float]:
"""检测滑移"""
if flow is None or len(flow) == 0:
return False, 0.0
# 计算接触区域内的平均流
mask_bool = contact_mask > 0
if not np.any(mask_bool):
return False, 0.0
flow_magnitude = np.linalg.norm(flow, axis=2)
contact_flow = flow_magnitude[mask_bool]
mean_flow = np.mean(contact_flow)
std_flow = np.std(contact_flow)
# 滑移阈值
slip_score = mean_flow + std_flow
is_slipping = slip_score > 2.0
return is_slipping, slip_score
def track_contact_movement(self, contour: Optional[np.ndarray]) -> np.ndarray:
"""跟踪接触点移动"""
if contour is None:
return np.array([0, 0])
M = cv2.moments(contour)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
else:
cx, cy = 0, 0
centroid = np.array([cx, cy])
self.contact_centroid_history.append(centroid)
if len(self.contact_centroid_history) > self.window_size:
self.contact_centroid_history.pop(0)
return centroid
class ForceEstimator(nn.Module):
"""力分布估计网络"""
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
nn.Conv2d(3, 32, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d(1)
)
self.regressor = nn.Sequential(
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 6) # 力螺旋 (Fx, Fy, Fz, Tx, Ty, Tz)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
features = self.encoder(x)
features = features.view(features.size(0), -1)
force_torque = self.regressor(features)
return force_torque
def simulate_tactile_sequence():
"""模拟触觉图像序列"""
config = TactileConfig()
num_frames = 100
# 生成模拟触觉图像
frames = []
for i in range(num_frames):
# 基础图像(带有渐变)
img = np.zeros((config.image_size, config.image_size, 3), dtype=np.uint8)
# 添加接触区域(模拟圆形接触)
center = (config.image_size//2 + int(10*np.sin(i*0.1)),
config.image_size//2 + int(5*i)) # 模拟滑动
radius = 30 + int(5*np.sin(i*0.2))
cv2.circle(img, center, radius, (200, 150, 100), -1)
# 添加噪声和纹理
noise = np.random.normal(0, 10, img.shape).astype(np.uint8)
img = cv2.add(img, noise)
# 添加凝胶变形效果(径向渐变)
Y, X = np.ogrid[:config.image_size, :config.image_size]
dist_from_center = np.sqrt((X-center[0])**2 + (Y-center[1])**2)
deformation = np.exp(-dist_from_center / (radius * 0.5))
img = (img * deformation[:,:,None]).astype(np.uint8)
frames.append(img)
return frames, config
def visualize_tactile_processing(frames: List[np.ndarray], config: TactileConfig):
"""可视化触觉处理流水线"""
processor = TactileImageProcessor(config)
reconstructor = GeometryReconstructor(config)
slip_detector = SlipDetector()
force_estimator = ForceEstimator()
force_estimator.eval()
# 设置背景
processor.set_background(frames[0])
# 处理序列
contact_masks = []
depths = []
normals = []
slip_scores = []
forces = []
prev_frame = frames[0]
for i, frame in enumerate(frames[1:]):
# 背景减除
diff = processor.subtract_background(frame)
# 检测接触
mask, contour = processor.detect_contact_region(diff)
contact_masks.append(mask)
# 几何重建
if np.any(mask > 0):
gx, gy = reconstructor.estimate_gradient(diff, mask)
depth = reconstructor.integrate_gradient(gx, gy)
normal = reconstructor.compute_surface_normals(depth)
depths.append(depth)
normals.append(normal)
else:
depths.append(np.zeros((config.image_size, config.image_size)))
normals.append(np.zeros((config.image_size, config.image_size, 3)))
# 滑移检测
flow = slip_detector.compute_optical_flow(prev_frame, frame)
is_slip, score = slip_detector.detect_slip(flow, mask)
slip_scores.append(score)
slip_detector.track_contact_movement(contour)
# 力估计
frame_tensor = torch.FloatTensor(frame).permute(2, 0, 1).unsqueeze(0) / 255.0
with torch.no_grad():
force = force_estimator(frame_tensor).numpy()[0]
forces.append(force)
prev_frame = frame
# 可视化
fig = plt.figure(figsize=(18, 12))
# 1. 原始图像示例
ax1 = fig.add_subplot(231)
ax1.imshow(frames[50])
ax1.set_title('Raw Tactile Image', fontsize=12, fontweight='bold')
ax1.axis('off')
# 2. 接触分割
ax2 = fig.add_subplot(232)
overlay = frames[50].copy()
overlay[contact_masks[49] > 0] = [255, 0, 0]
ax2.imshow(overlay)
ax2.set_title('Contact Segmentation', fontsize=12, fontweight='bold')
ax2.axis('off')
# 3. 深度重建
ax3 = fig.add_subplot(233)
im = ax3.imshow(depths[49], cmap='viridis')
ax3.set_title('Reconstructed Depth', fontsize=12, fontweight='bold')
ax3.axis('off')
plt.colorbar(im, ax=ax3, fraction=0.046, pad=0.04)
# 4. 表面法向可视化
ax4 = fig.add_subplot(234)
normal_vis = (normals[49] + 1) / 2 # 映射到[0,1]
ax4.imshow(normal_vis)
ax4.set_title('Surface Normals', fontsize=12, fontweight='bold')
ax4.axis('off')
# 5. 滑移检测时序
ax5 = fig.add_subplot(235)
time_axis = np.arange(len(slip_scores)) / config.fps
ax5.plot(time_axis, slip_scores, linewidth=2, color='#C73E1D')
ax5.axhline(y=2.0, color='red', linestyle='--', label='Slip Threshold')
ax5.fill_between(time_axis, 0, slip_scores, alpha=0.3, color='#C73E1D')
ax5.set_xlabel('Time (s)')
ax5.set_ylabel('Slip Score')
ax5.set_title('Slip Detection Over Time', fontsize=12, fontweight='bold')
ax5.legend()
ax5.grid(True, alpha=0.3)
# 6. 力估计
ax6 = fig.add_subplot(236)
forces = np.array(forces)
time_axis = np.arange(len(forces)) / config.fps
ax6.plot(time_axis, forces[:, :3], label=['Fx', 'Fy', 'Fz'], linewidth=2)
ax6.set_xlabel('Time (s)')
ax6.set_ylabel('Force (N)')
ax6.set_title('Estimated Forces', fontsize=12, fontweight='bold')
ax6.legend()
ax6.grid(True, alpha=0.3)
plt.suptitle('High-Frequency Tactile Sensing Pipeline', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('tactile_processing.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 tactile_processing.png")
plt.show()
def main():
print("生成模拟触觉序列...")
frames, config = simulate_tactile_sequence()
print("处理触觉数据...")
visualize_tactile_processing(frames, config)
print(f"处理频率: {config.fps} Hz")
print(f"图像分辨率: {config.image_size}x{config.image_size}")
if __name__ == "__main__":
main()
脚本8 多模态时间同步、触觉特征编码器、接触事件触发的策略切换
"""
Script 8: tactile_visual_fusion.py
Content: 多模态时间同步、触觉特征编码器、接触事件触发的策略切换
Usage:
1. 安装依赖: pip install torch numpy matplotlib opencv-python
2. 运行: python tactile_visual_fusion.py
3. 输出: 多模态融合可视化、注意力热图、策略切换状态机
Features:
- 时间戳对齐与同步
- 触觉-视觉特征融合网络
- 接触事件检测与状态切换
- 多模态注意力可视化
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle, FancyBboxPatch
from matplotlib.colors import LinearSegmentedColormap
from typing import Dict, Tuple, Optional, List
import time
from dataclasses import dataclass
from enum import Enum
class ContactState(Enum):
APPROACH = "approach"
CONTACT = "contact"
MANIPULATION = "manipulation"
RELEASE = "release"
@dataclass
class ModalityConfig:
visual_freq: int = 30 # Hz
tactile_freq: int = 120 # Hz
latent_dim: int = 128
image_size: int = 224
class TemporalSynchronizer:
"""时间同步器"""
def __init__(self, config: ModalityConfig):
self.config = config
self.visual_buffer = []
self.tactile_buffer = []
self.visual_timestamp = []
self.tactile_timestamp = []
def add_visual(self, image: np.ndarray, timestamp: float):
"""添加视觉帧"""
self.visual_buffer.append(image)
self.visual_timestamp.append(timestamp)
def add_tactile(self, image: np.ndarray, timestamp: float):
"""添加触觉帧"""
self.tactile_buffer.append(image)
self.tactile_timestamp.append(timestamp)
def get_synchronized_pair(self, target_time: float) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
"""获取同步的图像对"""
# 找到最接近目标时间的视觉帧
if not self.visual_timestamp or not self.tactile_timestamp:
return None, None
visual_idx = np.argmin(np.abs(np.array(self.visual_timestamp) - target_time))
tactile_idx = np.argmin(np.abs(np.array(self.tactile_timestamp) - target_time))
# 检查时间差是否在阈值内
visual_diff = abs(self.visual_timestamp[visual_idx] - target_time)
tactile_diff = abs(self.tactile_timestamp[tactile_idx] - target_time)
if visual_diff < 1.0/self.config.visual_freq and tactile_diff < 1.0/self.config.tactile_freq:
return self.visual_buffer[visual_idx], self.tactile_buffer[tactile_idx]
return None, None
class VisualEncoder(nn.Module):
"""视觉编码器"""
def __init__(self, latent_dim: int = 128):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(3, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d(1)
)
self.fc = nn.Linear(128, latent_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.conv(x)
x = x.view(x.size(0), -1)
return self.fc(x)
class TactileEncoder(nn.Module):
"""触觉编码器"""
def __init__(self, latent_dim: int = 128):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(3, 32, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d(1)
)
self.fc = nn.Linear(128, latent_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.conv(x)
x = x.view(x.size(0), -1)
return self.fc(x)
class CrossModalAttention(nn.Module):
"""跨模态注意力机制"""
def __init__(self, dim: int = 128, num_heads: int = 4):
super().__init__()
self.num_heads = num_heads
self.scale = (dim // num_heads) ** -0.5
self.q_visual = nn.Linear(dim, dim)
self.k_tactile = nn.Linear(dim, dim)
self.v_tactile = nn.Linear(dim, dim)
self.out_proj = nn.Linear(dim, dim)
def forward(self, visual_feat: torch.Tensor, tactile_feat: torch.Tensor) -> torch.Tensor:
B, C = visual_feat.shape
# 生成Q, K, V
Q = self.q_visual(visual_feat).view(B, self.num_heads, C // self.num_heads)
K = self.k_tactile(tactile_feat).view(B, self.num_heads, C // self.num_heads)
V = self.v_tactile(tactile_feat).view(B, self.num_heads, C // self.num_heads)
# 注意力计算
attn = torch.einsum('bhd,bhd->bh', Q, K) * self.scale
attn = F.softmax(attn, dim=-1)
# 加权求和
out = torch.einsum('bh,bhd->bhd', attn, V)
out = out.view(B, C)
out = self.out_proj(out)
return out, attn
class MultimodalFusionPolicy(nn.Module):
"""多模态融合策略"""
def __init__(self, config: ModalityConfig):
super().__init__()
self.visual_encoder = VisualEncoder(config.latent_dim)
self.tactile_encoder = TactileEncoder(config.latent_dim)
self.cross_attn = CrossModalAttention(config.latent_dim)
self.fusion_net = nn.Sequential(
nn.Linear(config.latent_dim * 2, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 7) # 动作输出
)
self.contact_classifier = nn.Sequential(
nn.Linear(config.latent_dim * 2, 64),
nn.ReLU(),
nn.Linear(64, 2) # 二分类:接触/非接触
)
def forward(self, visual: torch.Tensor, tactile: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
# 编码
v_feat = self.visual_encoder(visual)
t_feat = self.tactile_encoder(tactile)
# 跨模态注意力
fused_v, attn_weights = self.cross_attn(v_feat, t_feat)
# 特征拼接
combined = torch.cat([fused_v, v_feat], dim=-1)
# 动作输出
action = self.fusion_net(combined)
# 接触检测
contact_logit = self.contact_classifier(combined)
return action, contact_logit, attn_weights
class ContactStateMachine:
"""接触状态机"""
def __init__(self):
self.state = ContactState.APPROACH
self.contact_threshold = 0.5
self.release_threshold = 0.3
self.state_history = []
self.contact_force_threshold = 0.1
def update(self, contact_prob: float, force_magnitude: float) -> ContactState:
"""更新状态"""
prev_state = self.state
if self.state == ContactState.APPROACH:
if contact_prob > self.contact_threshold and force_magnitude > self.contact_force_threshold:
self.state = ContactState.CONTACT
elif self.state == ContactState.CONTACT:
if force_magnitude > self.contact_force_threshold * 2:
self.state = ContactState.MANIPULATION
elif contact_prob < self.release_threshold:
self.state = ContactState.RELEASE
elif self.state == ContactState.MANIPULATION:
if contact_prob < self.release_threshold:
self.state = ContactState.RELEASE
elif self.state == ContactState.RELEASE:
if contact_prob < self.release_threshold * 0.5:
self.state = ContactState.APPROACH
self.state_history.append(self.state)
return self.state
def get_modality_weights(self) -> Tuple[float, float]:
"""根据状态返回模态权重"""
if self.state == ContactState.APPROACH:
return 0.9, 0.1 # 视觉主导
elif self.state == ContactState.CONTACT:
return 0.5, 0.5 # 平衡
elif self.state == ContactState.MANIPULATION:
return 0.2, 0.8 # 触觉主导
else: # RELEASE
return 0.8, 0.2
def simulate_multimodal_sequence():
"""模拟多模态数据序列"""
config = ModalityConfig()
synchronizer = TemporalSynchronizer(config)
policy = MultimodalFusionPolicy(config)
state_machine = ContactStateMachine()
# 生成模拟数据
num_steps = 100
visual_data = []
tactile_data = []
contact_probs = []
forces = []
states = []
actions = []
current_time = 0.0
for step in range(num_steps):
# 模拟视觉数据(30Hz)
if step % 4 == 0:
visual_img = np.random.rand(3, 224, 224).astype(np.float32)
if 30 < step < 70: # 模拟接触阶段
visual_img[0, 100:150, 100:150] += 0.5 # 模拟物体
synchronizer.add_visual(visual_img, current_time)
visual_data.append(visual_img)
# 模拟触觉数据(120Hz)
tactile_img = np.random.rand(3, 224, 224).astype(np.float32) * 0.3
force_mag = 0.0
if 30 < step < 70: # 接触阶段
contact_intensity = np.sin((step - 30) / 40 * np.pi)
tactile_img[0, 80:160, 80:160] += contact_intensity * 0.7
force_mag = contact_intensity * 5.0
synchronizer.add_tactile(tactile_img, current_time)
tactile_data.append(tactile_img)
forces.append(force_mag)
# 策略推理
v_tensor = torch.FloatTensor(visual_img).unsqueeze(0)
t_tensor = torch.FloatTensor(tactile_img).unsqueeze(0)
with torch.no_grad():
action, contact_logit, attn = policy(v_tensor, t_tensor)
contact_prob = torch.softmax(contact_logit, dim=-1)[0, 1].item()
contact_probs.append(contact_prob)
# 状态机更新
current_state = state_machine.update(contact_prob, force_mag)
states.append(current_state)
# 根据状态调整动作(模拟)
vis_weight, tac_weight = state_machine.get_modality_weights()
adjusted_action = action.numpy()[0] * np.array([vis_weight] * 3 + [tac_weight] * 3 + [1])
actions.append(adjusted_action)
current_time += 1.0 / config.tactile_freq
return {
'visual': visual_data,
'tactile': tactile_data,
'contact_probs': contact_probs,
'forces': forces,
'states': states,
'actions': np.array(actions),
'attn': attn.numpy()
}
def visualize_multimodal_fusion(results: dict):
"""可视化多模态融合"""
fig = plt.figure(figsize=(18, 12))
# 1. 模态数据示例
ax1 = fig.add_subplot(231)
ax1.imshow(results['visual'][25].transpose(1, 2, 0))
ax1.set_title('Visual Input (Approach)', fontsize=11, fontweight='bold')
ax1.axis('off')
ax2 = fig.add_subplot(232)
ax2.imshow(results['tactile'][50].transpose(1, 2, 0))
ax2.set_title('Tactile Input (Contact)', fontsize=11, fontweight='bold')
ax2.axis('off')
# 2. 接触概率与力
ax3 = fig.add_subplot(233)
time_axis = np.arange(len(results['contact_probs'])) / 120.0
ax3.plot(time_axis, results['contact_probs'], label='Contact Probability',
linewidth=2, color='#2E86AB')
ax3.plot(time_axis, np.array(results['forces'])/5.0, label='Normalized Force',
linewidth=2, color='#F18F01')
# 标记状态
state_colors = {
ContactState.APPROACH: 'green',
ContactState.CONTACT: 'yellow',
ContactState.MANIPULATION: 'red',
ContactState.RELEASE: 'blue'
}
for i, state in enumerate(results['states']):
if i % 10 == 0:
ax3.axvspan(time_axis[i], time_axis[min(i+1, len(time_axis)-1)],
alpha=0.2, color=state_colors[state])
ax3.set_xlabel('Time (s)')
ax3.set_ylabel('Probability / Force')
ax3.set_title('Contact Detection & State Transition', fontsize=11, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 3. 状态机转换
ax4 = fig.add_subplot(234)
state_indices = [list(ContactState).index(s) for s in results['states']]
ax4.plot(time_axis, state_indices, linewidth=3, color='#A23B72')
ax4.set_yticks(range(4))
ax4.set_yticklabels(['APPROACH', 'CONTACT', 'MANIPULATION', 'RELEASE'])
ax4.set_xlabel('Time (s)')
ax4.set_title('Contact State Machine', fontsize=11, fontweight='bold')
ax4.grid(True, alpha=0.3)
# 4. 模态权重变化
ax5 = fig.add_subplot(235)
visual_weights = []
tactile_weights = []
for state in results['states']:
vw, tw = {ContactState.APPROACH: (0.9, 0.1),
ContactState.CONTACT: (0.5, 0.5),
ContactState.MANIPULATION: (0.2, 0.8),
ContactState.RELEASE: (0.8, 0.2)}[state]
visual_weights.append(vw)
tactile_weights.append(tw)
ax5.fill_between(time_axis, 0, visual_weights, alpha=0.5, label='Visual Weight', color='#3E92CC')
ax5.fill_between(time_axis, 0, tactile_weights, alpha=0.5, label='Tactile Weight', color='#AA6C39')
ax5.plot(time_axis, visual_weights, color='#3E92CC', linewidth=2)
ax5.plot(time_axis, tactile_weights, color='#AA6C39', linewidth=2)
ax5.set_xlabel('Time (s)')
ax5.set_ylabel('Modality Weight')
ax5.set_title('Adaptive Modality Fusion', fontsize=11, fontweight='bold')
ax5.legend()
ax5.set_ylim(0, 1)
# 5. 注意力热图
ax6 = fig.add_subplot(236)
attn_map = results['attn'][0].reshape(2, 2) # 简化展示
im = ax6.imshow(attn_map, cmap='viridis', aspect='auto')
ax6.set_title('Cross-Modal Attention', fontsize=11, fontweight='bold')
ax6.set_xticks([0, 1])
ax6.set_yticks([0, 1])
ax6.set_xticklabels(['Visual', 'Tactile'])
ax6.set_yticklabels(['Query', 'Key'])
plt.colorbar(im, ax=ax6)
plt.suptitle('Tactile-Visual Fusion for Adaptive Control', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('tactile_visual_fusion.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 tactile_visual_fusion.png")
plt.show()
def main():
print("初始化多模态融合系统...")
print("生成模拟序列...")
results = simulate_multimodal_sequence()
print("生成可视化...")
visualize_multimodal_fusion(results)
print("\n统计信息:")
state_counts = {}
for s in results['states']:
state_counts[s] = state_counts.get(s, 0) + 1
for state, count in state_counts.items():
print(f"{state.value}: {count} steps ({count/len(results['states'])*100:.1f}%)")
if __name__ == "__main__":
main()
脚本9 触觉反馈力调节、顺应控制参数学习、力-位混合控制
"""
Script 9: closed_loop_force_control.py
Content: 触觉反馈力调节、顺应控制参数学习、力-位混合控制
Usage:
1. 安装依赖: pip install torch numpy matplotlib
2. 运行: python closed_loop_force_control.py
3. 输出: 力跟踪曲线、顺应控制响应、混合控制策略可视化
Features:
- 力控制策略网络
- 自适应顺应控制
- 力-位混合控制实现
- 稳定性分析
"""
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch, Arrow
from matplotlib.collections import LineCollection
from typing import Tuple, List
from dataclasses import dataclass
@dataclass
class ImpedanceParams:
"""阻抗参数"""
M: float = 1.0 # 惯性
B: float = 10.0 # 阻尼
K: float = 50.0 # 刚度
class ForceControlPolicy(nn.Module):
"""力控制策略网络"""
def __init__(self, state_dim: int = 12, action_dim: int = 6):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, action_dim),
nn.Tanh()
)
self.force_scale = 10.0
def forward(self, force_error: torch.Tensor,
position: torch.Tensor,
velocity: torch.Tensor) -> torch.Tensor:
x = torch.cat([force_error, position, velocity], dim=-1)
return self.net(x) * self.force_scale
class AdaptiveComplianceController:
"""自适应顺应控制器"""
def __init__(self):
self.impedance = ImpedanceParams()
self.adaptation_rate = 0.01
self.force_error_integral = 0.0
def update_impedance(self, force_error: float, position_error: float):
"""根据力误差更新阻抗参数"""
self.force_error_integral += force_error * 0.01
# 自适应律:力误差大时降低刚度,位置误差大时增加刚度
self.impedance.K = np.clip(
self.impedance.K + self.adaptation_rate * (position_error - 0.1 * abs(force_error)),
10.0, 200.0
)
# 阻尼自适应
self.impedance.B = np.clip(
self.impedance.B + self.adaptation_rate * (abs(force_error) - 2.0),
5.0, 50.0
)
def compute_control(self, desired_force: np.ndarray,
actual_force: np.ndarray,
desired_pos: np.ndarray,
actual_pos: np.ndarray,
velocity: np.ndarray) -> np.ndarray:
"""计算顺应控制力"""
force_error = desired_force - actual_force
pos_error = desired_pos - actual_pos
# 更新参数
self.update_impedance(np.linalg.norm(force_error), np.linalg.norm(pos_error))
# 阻抗控制律
F_impedance = (self.impedance.K * pos_error +
self.impedance.B * velocity +
self.impedance.M * np.zeros_like(velocity)) # 加速度假设为0
# 力控制项
F_force = force_error * 0.5
return F_impedance + F_force
class HybridForcePositionController:
"""力-位混合控制器"""
def __init__(self, force_dim: int = 3):
self.force_dim = force_dim
self.force_control_axes = np.array([0, 0, 1, 0, 0, 0]) # Z轴力控
self.position_control_axes = 1 - self.force_control_axes
self.force_controller = ForceControlPolicy()
self.position_gain = 10.0
def selection_matrix(self) -> np.ndarray:
"""生成选择矩阵"""
S = np.diag(self.force_control_axes)
S_pos = np.diag(self.position_control_axes)
return S, S_pos
def control(self,
desired_force: np.ndarray,
actual_force: np.ndarray,
desired_pos: np.ndarray,
actual_pos: np.ndarray,
velocity: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
混合控制
返回: (总控制力, 力控制分量, 位置控制分量)
"""
S, S_pos = self.selection_matrix()
# 力控制(在选择方向上)
with torch.no_grad():
force_error = torch.FloatTensor(desired_force - actual_force).unsqueeze(0)
pos_t = torch.FloatTensor(actual_pos).unsqueeze(0)
vel_t = torch.FloatTensor(velocity).unsqueeze(0)
force_cmd = self.force_controller(force_error, pos_t, vel_t).numpy()[0]
# 位置控制(在正交方向上)
pos_error = desired_pos - actual_pos
position_cmd = self.position_gain * pos_error * self.position_control_axes[:3]
# 组合
total_cmd = S[:3, :3] @ force_cmd[:3] + S_pos[:3, :3] @ position_cmd
return total_cmd, force_cmd[:3], position_cmd
class ForceControlSimulator:
"""力控制仿真器"""
def __init__(self):
self.dt = 0.01
self.mass = 1.0
self.environment_stiffness = 1000.0 # 环境刚度
def simulate_contact_dynamics(self, position: np.ndarray,
force: np.ndarray,
surface_height: float = 0.5) -> Tuple[np.ndarray, np.ndarray]:
"""模拟接触动力学"""
# 检测接触
penetration = max(0, surface_height - position[2])
# 环境反作用力
if penetration > 0:
env_force = np.array([0, 0, self.environment_stiffness * penetration])
damping = -50 * position[2] # 阻尼
env_force[2] += damping
else:
env_force = np.zeros(3)
# 物体动力学
acceleration = (force[:3] + env_force) / self.mass
return acceleration, env_force
def simulate_force_control_task():
"""模拟力控制任务(如抛光)"""
# 初始化控制器
adaptive_controller = AdaptiveComplianceController()
hybrid_controller = HybridForcePositionController()
simulator = ForceControlSimulator()
# 任务参数
desired_force = np.array([0, 0, 5.0]) # 期望5N向下力
desired_trajectory = []
for t in np.arange(0, 5, 0.01):
x = 0.3 + 0.1 * np.sin(t)
y = 0.2 + 0.1 * np.cos(t)
z = 0.5 # 保持高度
desired_trajectory.append([x, y, z])
desired_trajectory = np.array(desired_trajectory)
# 记录数据
positions = [np.array([0.3, 0.2, 0.7])]
velocities = [np.zeros(3)]
actual_forces = [np.zeros(3)]
impedance_k = [adaptive_controller.impedance.K]
impedance_b = [adaptive_controller.impedance.B]
for i in range(len(desired_trajectory)):
current_pos = positions[-1]
current_vel = velocities[-1]
actual_force = actual_forces[-1]
# 顺应控制
adaptive_force = adaptive_controller.compute_control(
desired_force, actual_force,
desired_trajectory[i], current_pos, current_vel
)
# 混合控制(仅在Z轴力控,XY轴位控)
total_force, force_comp, pos_comp = hybrid_controller.control(
desired_force, actual_force,
desired_trajectory[i], current_pos, current_vel
)
# 仿真步进
accel, env_force = simulator.simulate_contact_dynamics(current_pos, total_force)
new_vel = current_vel + accel * simulator.dt
new_pos = current_pos + new_vel * simulator.dt
positions.append(new_pos)
velocities.append(new_vel)
actual_forces.append(env_force)
impedance_k.append(adaptive_controller.impedance.K)
impedance_b.append(adaptive_controller.impedance.B)
return {
'positions': np.array(positions),
'forces': np.array(actual_forces),
'desired_forces': np.tile(desired_force, (len(actual_forces), 1)),
'desired_positions': desired_trajectory,
'K': np.array(impedance_k),
'B': np.array(impedance_b)
}
def visualize_force_control(results: dict):
"""可视化力控制结果"""
fig = plt.figure(figsize=(18, 12))
time_axis = np.arange(len(results['forces'])) * 0.01
# 1. 力跟踪
ax1 = fig.add_subplot(231)
ax1.plot(time_axis, results['forces'][:, 2], label='Actual Force', linewidth=2, color='#2E86AB')
ax1.plot(time_axis, results['desired_forces'][:, 2], '--', label='Desired Force',
linewidth=2, color='#C73E1D')
ax1.fill_between(time_axis,
results['forces'][:, 2],
results['desired_forces'][:, 2],
alpha=0.3, color='gray')
ax1.set_xlabel('Time (s)')
ax1.set_ylabel('Force Z (N)')
ax1.set_title('Force Tracking Performance', fontsize=12, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 顺应参数自适应
ax2 = fig.add_subplot(232)
ax2.plot(time_axis, results['K'], label='Stiffness K', linewidth=2, color='#F18F01')
ax2.plot(time_axis, results['B'], label='Damping B', linewidth=2, color='#A23B72')
ax2.set_xlabel('Time (s)')
ax2.set_ylabel('Parameter Value')
ax2.set_title('Adaptive Impedance Parameters', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. 3D轨迹(显示接触面)
ax3 = fig.add_subplot(233, projection='3d')
ax3.plot(results['positions'][:, 0],
results['positions'][:, 1],
results['positions'][:, 2],
linewidth=2, label='Actual Path', color='#2E86AB')
ax3.plot(results['desired_positions'][:, 0],
results['desired_positions'][:, 1],
results['desired_positions'][:, 2],
'--', linewidth=2, label='Desired Path', color='#C73E1D')
# 绘制接触平面
xx, yy = np.meshgrid(np.linspace(0.2, 0.4, 10), np.linspace(0.1, 0.3, 10))
zz = np.full_like(xx, 0.5)
ax3.plot_surface(xx, yy, zz, alpha=0.3, color='gray', label='Contact Surface')
ax3.set_xlabel('X (m)')
ax3.set_ylabel('Y (m)')
ax3.set_zlabel('Z (m)')
ax3.set_title('Hybrid Force/Position Control Path', fontsize=12, fontweight='bold')
ax3.legend()
# 4. 力-位置曲线(显示顺应性)
ax4 = fig.add_subplot(234)
z_positions = results['positions'][:, 2]
z_forces = results['forces'][:, 2]
ax4.scatter(z_positions, z_forces, c=time_axis, cmap='viridis', s=20, alpha=0.6)
ax4.plot(z_positions, z_forces, alpha=0.3, color='gray')
ax4.axvline(x=0.5, color='red', linestyle='--', label='Contact Surface')
ax4.set_xlabel('Position Z (m)')
ax4.set_ylabel('Force Z (N)')
ax4.set_title('Force-Position Relationship (Compliance)', fontsize=12, fontweight='bold')
ax4.legend()
cbar = plt.colorbar(ax4.collections[0], ax=ax4)
cbar.set_label('Time (s)')
# 5. 跟踪误差分析
ax5 = fig.add_subplot(235)
force_error = np.linalg.norm(results['forces'] - results['desired_forces'], axis=1)
position_error = np.linalg.norm(results['positions'][1:] - results['desired_positions'], axis=1)
ax5.plot(time_axis, force_error, label='Force Error', linewidth=2, color='#2E86AB')
ax5_twin = ax5.twinx()
ax5_twin.plot(time_axis[1:], position_error, label='Position Error',
linewidth=2, color='#F18F01', linestyle='--')
ax5.set_xlabel('Time (s)')
ax5.set_ylabel('Force Error (N)', color='#2E86AB')
ax5_twin.set_ylabel('Position Error (m)', color='#F18F01')
ax5.set_title('Tracking Errors', fontsize=12, fontweight='bold')
ax5.grid(True, alpha=0.3)
# 合并图例
lines1, labels1 = ax5.get_legend_handles_labels()
lines2, labels2 = ax5_twin.get_legend_handles_labels()
ax5.legend(lines1 + lines2, labels1 + labels2, loc='upper right')
# 6. 控制模式示意图
ax6 = fig.add_subplot(236)
ax6.axis('off')
ax6.set_xlim(0, 10)
ax6.set_ylim(0, 10)
# 绘制混合控制示意图
box1 = FancyBboxPatch((0.5, 6), 3, 2, boxstyle="round,pad=0.1",
facecolor='#2E86AB', edgecolor='black', linewidth=2)
box2 = FancyBboxPatch((0.5, 3), 3, 2, boxstyle="round,pad=0.1",
facecolor='#F18F01', edgecolor='black', linewidth=2)
box3 = FancyBboxPatch((0.5, 0.5), 3, 2, boxstyle="round,pad=0.1",
facecolor='#A23B72', edgecolor='black', linewidth=2)
ax6.add_patch(box1)
ax6.add_patch(box2)
ax6.add_patch(box3)
ax6.text(2, 7, 'Position Control\n(X, Y axes)', ha='center', va='center',
fontsize=11, fontweight='bold', color='white')
ax6.text(2, 4, 'Force Control\n(Z axis)', ha='center', va='center',
fontsize=11, fontweight='bold', color='white')
ax6.text(2, 1.5, 'Adaptive Compliance\n(Online Learning)', ha='center', va='center',
fontsize=11, fontweight='bold', color='white')
# 箭头
ax6.arrow(4, 7, 1.5, 0, head_width=0.3, head_length=0.3, fc='black', ec='black')
ax6.arrow(4, 4, 1.5, 0, head_width=0.3, head_length=0.3, fc='black', ec='black')
ax6.arrow(4, 1.5, 1.5, 0, head_width=0.3, head_length=0.3, fc='black', ec='black')
ax6.text(6, 7, 'Trajectory Tracking', fontsize=10, va='center')
ax6.text(6, 4, 'Contact Force', fontsize=10, va='center')
ax6.text(6, 1.5, 'Environment Adaptation', fontsize=10, va='center')
ax6.set_title('Hybrid Control Architecture', fontsize=12, fontweight='bold')
plt.suptitle('Closed-Loop Force Control with Learning', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('force_control_learning.png', dpi=150, bbox_inches='tight')
print("可视化结果已保存至 force_control_learning.png")
plt.show()
def main():
print("初始化力控制系统...")
print("运行顺应控制与混合控制模拟...")
results = simulate_force_control_task()
print("生成可视化...")
visualize_force_control(results)
# 计算性能指标
force_rmse = np.sqrt(np.mean((results['forces'] - results['desired_forces'])**2))
print(f"\n力控制RMSE: {force_rmse:.3f} N")
print(f"最终刚度K: {results['K'][-1]:.1f} N/m")
print(f"最终阻尼B: {results['B'][-1]:.1f} Ns/m")
if __name__ == "__main__":
main()
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)