主题086_Transformer模型在辐射序列预测中的应用
主题086:Transformer模型在辐射序列预测中的应用
摘要
Transformer模型作为自然语言处理领域的突破性架构,其强大的序列建模能力为辐射换热问题的时序预测提供了新的解决方案。本主题将深入探讨如何利用Transformer模型预测瞬态辐射过程的时序演化,包括温度场的时间序列预测、辐射热流的动态建模以及多步 ahead 预测等关键问题。通过6个详细的Python仿真案例,展示Transformer在辐射换热时序预测中的创新应用。
关键词
Transformer、时序预测、注意力机制、瞬态辐射、序列建模、自注意力、位置编码、多头注意力、辐射换热预测






第一章:Transformer基础理论
1.1 Transformer架构概述
Transformer模型由Vaswani等人在2017年提出,最初用于机器翻译任务。其核心创新是完全基于注意力机制,摒弃了传统的循环神经网络(RNN)和卷积神经网络(CNN)。
Transformer的核心组件
Transformer架构:
├── 编码器(Encoder)
│ ├── 输入嵌入(Input Embedding)
│ ├── 位置编码(Positional Encoding)
│ └── N个编码器层
│ ├── 多头自注意力(Multi-Head Self-Attention)
│ ├── 前馈神经网络(Feed-Forward Network)
│ └── 层归一化(Layer Normalization)
│
└── 解码器(Decoder)
├── 输出嵌入(Output Embedding)
├── 位置编码(Positional Encoding)
└── N个解码器层
├── 掩码多头自注意力(Masked Multi-Head Self-Attention)
├── 编码器-解码器注意力(Encoder-Decoder Attention)
├── 前馈神经网络(Feed-Forward Network)
└── 层归一化(Layer Normalization)
辐射换热中的序列数据
在辐射换热问题中,存在多种时序数据:
时序数据类型:
├── 温度场演化:T(x, y, z, t)
├── 辐射热流:q_rad(t)
├── 边界条件变化:T_boundary(t)
├── 热源功率:Q_source(t)
├── 发射率变化:ε(t)(如氧化过程)
└── 太阳辐射:q_solar(t)(日变化、季节变化)
1.2 自注意力机制
自注意力机制是Transformer的核心,它允许模型在处理序列的每个位置时,关注序列中的所有位置。
自注意力的数学定义
给定输入序列 X=[x1,x2,...,xn]X = [x_1, x_2, ..., x_n]X=[x1,x2,...,xn],自注意力计算为:
Attention(Q,K,V)=softmax(QKTdk)V\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)VAttention(Q,K,V)=softmax(dkQKT)V
其中:
- Q=XWQQ = XW_QQ=XWQ:查询矩阵(Query)
- K=XWKK = XW_KK=XWK:键矩阵(Key)
- V=XWVV = XW_VV=XWV:值矩阵(Value)
- dkd_kdk:键的维度
- WQ,WK,WVW_Q, W_K, W_VWQ,WK,WV:可学习的权重矩阵
缩放点积注意力的作用
缩放因子 1dk\frac{1}{\sqrt{d_k}}dk1 的作用是防止点积过大导致softmax梯度消失:
softmax(QKTdk)i=exp(qi⋅kjdk)∑jexp(qi⋅kjdk)\text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)_i = \frac{\exp\left(\frac{q_i \cdot k_j}{\sqrt{d_k}}\right)}{\sum_j \exp\left(\frac{q_i \cdot k_j}{\sqrt{d_k}}\right)}softmax(dkQKT)i=∑jexp(dkqi⋅kj)exp(dkqi⋅kj)
辐射换热中的注意力解释
在辐射时序预测中,注意力权重可以解释为:
- 时间相关性:过去哪些时刻对当前预测最重要
- 周期性模式:识别日变化、季节变化等周期
- 突变检测:识别热源开启/关闭等事件
- 长期依赖:捕捉缓慢变化的趋势
1.3 多头注意力机制
多头注意力允许模型在不同的表示子空间中学习不同的注意力模式。
多头注意力的计算
MultiHead(Q,K,V)=Concat(head1,...,headh)WO\text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, ..., \text{head}_h)W^OMultiHead(Q,K,V)=Concat(head1,...,headh)WO
其中每个头:
headi=Attention(QWiQ,KWiK,VWiV)\text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V)headi=Attention(QWiQ,KWiK,VWiV)
辐射换热中的多头注意力
在辐射时序预测中,不同的注意力头可以学习:
注意力头分工:
├── 头1:短期动态(秒级、分钟级变化)
├── 头2:中期趋势(小时级变化)
├── 头3:长期周期(日变化)
├── 头4:季节性模式(年变化)
├── 头5:异常检测(突变事件)
└── 头6:稳态趋势(缓慢漂移)
1.4 位置编码
由于Transformer没有循环或卷积结构,需要显式地注入位置信息。
正弦位置编码
PE(pos,2i)=sin(pos100002i/dmodel)PE_{(pos, 2i)} = \sin\left(\frac{pos}{10000^{2i/d_{model}}}\right)PE(pos,2i)=sin(100002i/dmodelpos)
PE(pos,2i+1)=cos(pos100002i/dmodel)PE_{(pos, 2i+1)} = \cos\left(\frac{pos}{10000^{2i/d_{model}}}\right)PE(pos,2i+1)=cos(100002i/dmodelpos)
其中:
- pospospos:位置索引
- iii:维度索引
- dmodeld_{model}dmodel:模型维度
位置编码的特性
- 唯一性:每个位置有唯一的编码
- 相对位置:可以通过线性变换表示相对位置
- 有界性:值域在[-1, 1]之间
- 连续性:相邻位置的编码相似
辐射时序中的位置编码
对于辐射换热时序数据,位置编码可以表示:
- 绝对时间:采样点的绝对位置
- 相对时间:与当前预测点的时间差
- 周期性时间:一天中的时刻、一年中的日期
1.5 前馈神经网络
每个编码器和解码器层包含一个全连接的前馈网络:
FFN(x)=max(0,xW1+b1)W2+b2\text{FFN}(x) = \max(0, xW_1 + b_1)W_2 + b_2FFN(x)=max(0,xW1+b1)W2+b2
或者使用GELU激活函数:
GELU(x)=x⋅Φ(x)=x⋅12[1+erf(x2)]\text{GELU}(x) = x \cdot \Phi(x) = x \cdot \frac{1}{2}\left[1 + \text{erf}\left(\frac{x}{\sqrt{2}}\right)\right]GELU(x)=x⋅Φ(x)=x⋅21[1+erf(2x)]
前馈网络的作用
- 非线性变换:增加模型的表达能力
- 特征提取:学习复杂的时序模式
- 维度变换:在不同维度空间之间映射
第二章:辐射时序数据的特征工程
2.1 时序数据的预处理
辐射换热时序数据需要经过预处理才能输入Transformer模型。
数据清洗
# 缺失值处理
def handle_missing_values(data, method='interpolation'):
if method == 'interpolation':
return data.interpolate(method='linear')
elif method == 'forward_fill':
return data.fillna(method='ffill')
# 异常值检测
def detect_outliers(data, threshold=3):
mean = np.mean(data)
std = np.std(data)
outliers = np.abs(data - mean) > threshold * std
return outliers
# 平滑处理
def smooth_data(data, window=5):
return np.convolve(data, np.ones(window)/window, mode='same')
数据归一化
# Min-Max归一化
def min_max_normalize(data):
return (data - np.min(data)) / (np.max(data) - np.min(data))
# Z-score标准化
def z_score_normalize(data):
return (data - np.mean(data)) / np.std(data)
# 辐射换热专用归一化(考虑物理约束)
def physics_based_normalize(T, T_min=250, T_max=1500):
"""基于物理范围的归一化"""
return (T - T_min) / (T_max - T_min)
2.2 特征提取
从原始辐射时序数据中提取有意义的特征。
时域特征
时域特征 = [
当前值, # 当前时刻的温度/热流
一阶差分, # dT/dt
二阶差分, # d²T/dt²
移动平均, # 局部平滑
移动标准差, # 局部波动性
累积和, # 积分特征
峰值计数, # 局部极值数量
过零率 # 符号变化频率
]
频域特征
使用快速傅里叶变换(FFT)提取频域特征:
def extract_frequency_features(signal, fs=1.0):
"""提取频域特征"""
fft_result = np.fft.fft(signal)
frequencies = np.fft.fftfreq(len(signal), 1/fs)
# 功率谱密度
psd = np.abs(fft_result)**2
# 主导频率
dominant_freq = frequencies[np.argmax(psd)]
# 频带能量
low_freq_energy = np.sum(psd[frequencies < 0.1])
mid_freq_energy = np.sum(psd[(frequencies >= 0.1) & (frequencies < 1.0)])
high_freq_energy = np.sum(psd[frequencies >= 1.0])
return {
'dominant_freq': dominant_freq,
'low_freq_energy': low_freq_energy,
'mid_freq_energy': mid_freq_energy,
'high_freq_energy': high_freq_energy
}
物理特征
物理特征 = [
辐射热流, # q_rad = εσT⁴
有效辐射, # J = εE_b + (1-ε)G
净热流, # q_net = q_in - q_out
热惯性, # ρc_pV
时间常数, # τ = ρc_pV / (hA)
Biot数, # Bi = hL / k
辐射-对流比 # q_rad / q_conv
]
2.3 序列构造
将预处理后的数据构造为Transformer可用的序列格式。
滑动窗口
输入序列(lookback=10)→ 输出(prediction_horizon=5)
[t-10, t-9, ..., t-1] → [t, t+1, ..., t+4]
[t-9, t-8, ..., t] → [t+1, t+2, ..., t+5]
[t-8, t-7, ..., t+1] → [t+2, t+3, ..., t+6]
...
多变量序列
多变量输入 = [
[T_surface, T_ambient, q_solar, wind_speed, humidity], # t-10
[T_surface, T_ambient, q_solar, wind_speed, humidity], # t-9
...
[T_surface, T_ambient, q_solar, wind_speed, humidity], # t-1
]
2.4 时间特征编码
将时间信息编码为模型可理解的特征。
周期性编码
def encode_time_features(timestamp):
"""编码时间特征"""
hour = timestamp.hour
day_of_week = timestamp.dayofweek
day_of_year = timestamp.dayofyear
month = timestamp.month
# 周期性编码
hour_sin = np.sin(2 * np.pi * hour / 24)
hour_cos = np.cos(2 * np.pi * hour / 24)
day_sin = np.sin(2 * np.pi * day_of_year / 365)
day_cos = np.cos(2 * np.pi * day_of_year / 365)
return [hour_sin, hour_cos, day_sin, day_cos, day_of_week, month]
第三章:Transformer架构设计
3.1 编码器设计
辐射时序预测的编码器负责提取输入序列的特征表示。
输入嵌入层
class InputEmbedding:
"""输入嵌入层"""
def __init__(self, input_dim, d_model):
self.W = np.random.randn(input_dim, d_model) * 0.02
self.b = np.zeros(d_model)
def forward(self, x):
"""x: [batch_size, seq_len, input_dim]"""
return x @ self.W + self.b
位置编码层
class PositionalEncoding:
"""正弦位置编码"""
def __init__(self, d_model, max_len=5000):
pe = np.zeros((max_len, d_model))
position = np.arange(0, max_len).reshape(-1, 1)
div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
pe[:, 0::2] = np.sin(position * div_term)
pe[:, 1::2] = np.cos(position * div_term)
self.pe = pe
def forward(self, x):
"""添加位置编码"""
seq_len = x.shape[1]
return x + self.pe[:seq_len, :]
编码器层
class EncoderLayer:
"""Transformer编码器层"""
def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
self.self_attn = MultiHeadAttention(d_model, n_heads)
self.feed_forward = FeedForward(d_model, d_ff)
self.norm1 = LayerNorm(d_model)
self.norm2 = LayerNorm(d_model)
self.dropout = dropout
def forward(self, x, mask=None):
# 自注意力子层
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + attn_output) # 残差连接 + 层归一化
# 前馈子层
ff_output = self.feed_forward(x)
x = self.norm2(x + ff_output)
return x
3.2 多头注意力实现
class MultiHeadAttention:
"""多头注意力机制"""
def __init__(self, d_model, n_heads):
assert d_model % n_heads == 0
self.d_model = d_model
self.n_heads = n_heads
self.d_k = d_model // n_heads
# 初始化权重矩阵
self.W_Q = np.random.randn(d_model, d_model) * 0.02
self.W_K = np.random.randn(d_model, d_model) * 0.02
self.W_V = np.random.randn(d_model, d_model) * 0.02
self.W_O = np.random.randn(d_model, d_model) * 0.02
def split_heads(self, x):
"""将输入分割为多个头"""
batch_size, seq_len, d_model = x.shape
return x.reshape(batch_size, seq_len, self.n_heads, self.d_k).transpose(0, 2, 1, 3)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
"""缩放点积注意力"""
scores = np.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)
if mask is not None:
scores = np.where(mask, scores, -1e9)
attn_weights = softmax(scores, axis=-1)
output = np.matmul(attn_weights, V)
return output, attn_weights
def forward(self, Q, K, V, mask=None):
batch_size = Q.shape[0]
# 线性变换
Q = Q @ self.W_Q
K = K @ self.W_K
V = V @ self.W_V
# 分割多头
Q = self.split_heads(Q)
K = self.split_heads(K)
V = self.split_heads(V)
# 注意力计算
attn_output, attn_weights = self.scaled_dot_product_attention(Q, K, V, mask)
# 合并多头
attn_output = attn_output.transpose(0, 2, 1, 3).reshape(batch_size, -1, self.d_model)
# 输出线性变换
output = attn_output @ self.W_O
return output
3.3 解码器设计
对于时序预测任务,解码器负责生成未来序列的预测。
掩码自注意力
def create_look_ahead_mask(size):
"""创建前瞻掩码(防止看到未来信息)"""
mask = np.triu(np.ones((size, size)), k=1)
return mask == 0 # 下三角为True
解码器层
class DecoderLayer:
"""Transformer解码器层"""
def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
self.masked_self_attn = MultiHeadAttention(d_model, n_heads)
self.enc_dec_attn = MultiHeadAttention(d_model, n_heads)
self.feed_forward = FeedForward(d_model, d_ff)
self.norm1 = LayerNorm(d_model)
self.norm2 = LayerNorm(d_model)
self.norm3 = LayerNorm(d_model)
def forward(self, x, enc_output, look_ahead_mask=None, padding_mask=None):
# 掩码自注意力
attn1 = self.masked_self_attn(x, x, x, look_ahead_mask)
x = self.norm1(x + attn1)
# 编码器-解码器注意力
attn2 = self.enc_dec_attn(x, enc_output, enc_output, padding_mask)
x = self.norm2(x + attn2)
# 前馈网络
ff_output = self.feed_forward(x)
x = self.norm3(x + ff_output)
return x
3.4 输出层设计
class OutputLayer:
"""输出层"""
def __init__(self, d_model, output_dim):
self.W = np.random.randn(d_model, output_dim) * 0.02
self.b = np.zeros(output_dim)
def forward(self, x):
"""生成预测"""
return x @ self.W + self.b
第四章:训练策略与优化
4.1 损失函数设计
针对辐射时序预测任务设计专门的损失函数。
均方误差(MSE)
LMSE=1N∑i=1N(y^i−yi)2\mathcal{L}_{MSE} = \frac{1}{N} \sum_{i=1}^{N} (\hat{y}_i - y_i)^2LMSE=N1i=1∑N(y^i−yi)2
平均绝对误差(MAE)
LMAE=1N∑i=1N∣y^i−yi∣\mathcal{L}_{MAE} = \frac{1}{N} \sum_{i=1}^{N} |\hat{y}_i - y_i|LMAE=N1i=1∑N∣y^i−yi∣
Huber损失
LHuber={12(y^−y)2if ∣y^−y∣≤δδ(∣y^−y∣−12δ)otherwise\mathcal{L}_{Huber} = \begin{cases} \frac{1}{2}(\hat{y} - y)^2 & \text{if } |\hat{y} - y| \leq \delta \\ \delta(|\hat{y} - y| - \frac{1}{2}\delta) & \text{otherwise} \end{cases}LHuber={21(y^−y)2δ(∣y^−y∣−21δ)if ∣y^−y∣≤δotherwise
物理约束损失
def physics_constrained_loss(pred, true, T_prev, dt, thermal_inertia):
"""
物理约束损失
确保预测满足能量守恒
"""
# 数据损失
mse_loss = np.mean((pred - true)**2)
# 物理约束:温度变化率不应超过物理限制
dT_dt = (pred - T_prev) / dt
max_dT_dt = 1000.0 / thermal_inertia # 假设最大加热功率1000W
physics_violation = np.maximum(0, np.abs(dT_dt) - max_dT_dt)
physics_loss = np.mean(physics_violation**2)
return mse_loss + 0.1 * physics_loss
4.2 学习率调度
Warmup策略
class WarmupScheduler:
"""带warmup的学习率调度器"""
def __init__(self, d_model, warmup_steps=4000):
self.d_model = d_model
self.warmup_steps = warmup_steps
def get_lr(self, step):
"""计算学习率"""
step = max(1, step)
return np.power(self.d_model, -0.5) * \
np.min([np.power(step, -0.5), step * np.power(self.warmup_steps, -1.5)])
余弦退火
def cosine_annealing_lr(initial_lr, current_step, total_steps):
"""余弦退火学习率"""
return initial_lr * 0.5 * (1 + np.cos(np.pi * current_step / total_steps))
4.3 正则化技术
Dropout
def dropout(x, rate=0.1, training=True):
"""Dropout正则化"""
if not training:
return x
mask = np.random.binomial(1, 1-rate, size=x.shape)
return x * mask / (1 - rate)
标签平滑
def label_smoothing(targets, smoothing=0.1, num_classes=1):
"""标签平滑"""
return (1 - smoothing) * targets + smoothing / num_classes
第五章:应用案例与前沿研究
5.1 建筑热环境预测
Transformer可以用于预测建筑内部的温度演化,支持HVAC系统优化。
应用场景
- 室内温度预测(1-24小时ahead)
- 空调负荷预测
- 热舒适性评估
- 能耗优化
输入特征
输入特征 = [
历史室内温度, # 过去24小时
室外温度, # 天气预报
太阳辐射, # 太阳位置、云量
人员 occupancy, # 人员数量
HVAC状态, # 空调开关、设定温度
时间特征 # 小时、星期、节假日
]
5.2 工业过程控制
在工业炉窑、热处理等过程中,Transformer可以预测温度曲线。
关键挑战
- 高温环境(>1000°C)
- 快速温度变化
- 多区温度控制
- 产品质量约束
5.3 太阳能热利用
预测集热器的温度输出,优化热能利用。
预测目标
- 集热器出口温度
- 储热水箱温度分层
- 系统热效率
- 最佳运行策略
5.4 前沿研究方向
Informer:高效Transformer
针对长序列预测的优化:
- ProbSparse自注意力
- 自注意力蒸馏
- 生成式解码器
Autoformer:自相关机制
利用序列的周期性:
- 自相关机制
- 时序分解
- 编码器-解码器结构
FEDformer:频域增强
结合频域分析:
- 频域注意力
- 混合专家分解
- 随机采样
第六章:Python仿真案例
案例1:基础Transformer预测温度序列
案例描述
使用基础Transformer架构预测简单的一维温度序列。
物理场景
- 封闭腔体,表面温度随时间变化
- 周期性加热和冷却
- 预测未来10个时间步的温度
Transformer配置
- 输入序列长度:20
- 预测长度:10
- 模型维度:32
- 注意力头数:4
- 编码器层数:2
预期结果
- 准确捕捉温度趋势
- 学习周期性模式
- 合理的预测误差(<5%)
案例2:多头注意力可视化
案例描述
可视化Transformer中不同注意力头的关注模式。
可视化内容
- 注意力权重热力图
- 不同头的专业化分工
- 时间依赖关系
预期结果
- 某些头关注短期变化
- 某些头关注长期趋势
- 清晰的注意力模式
案例3:多步 ahead 预测
案例描述
比较不同预测步长(1-step, 5-step, 20-step)的性能。
评估指标
- MSE、MAE、RMSE
- 随预测步长增加的误差累积
- 不同时间尺度的预测能力
预期结果
- 短期预测精度高
- 长期预测误差增大
- 误差累积分析
案例4:物理约束Transformer
案例描述
在Transformer中加入物理约束,确保预测结果满足能量守恒。
物理约束
- 温度变化率限制
- 能量守恒约束
- 边界条件约束
预期结果
- 预测结果物理上可行
- 提高泛化能力
- 减少对训练数据的依赖
案例5:Transformer与传统方法对比
案例描述
对比Transformer与ARIMA、LSTM等传统时序预测方法。
对比方法
- ARIMA(统计方法)
- LSTM(循环神经网络)
- Transformer(注意力机制)
评估维度
- 预测精度
- 训练时间
- 推理速度
- 长程依赖能力
案例6:实时辐射换热预测系统
案例描述
构建一个模拟的实时预测系统,展示Transformer在实际应用中的效果。
系统组件
- 数据流模拟
- 在线预测
- 结果可视化
- 性能监控
预期结果
- 实时预测能力
- 动态更新
- 系统稳定性
"""
主题086:Transformer模型在辐射序列预测中的应用 - Python仿真程序
本程序实现6个Transformer在辐射时序预测中的仿真案例:
1. 基础Transformer预测温度序列
2. 多头注意力可视化
3. 多步 ahead 预测
4. 物理约束Transformer
5. Transformer与传统方法对比
6. 实时辐射换热预测系统
作者:仿真模拟专家
日期:2026-03-10
"""
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle, FancyBboxPatch, Circle, FancyArrowPatch
from matplotlib.collections import LineCollection
import matplotlib.animation as animation
from matplotlib.gridspec import GridSpec
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# ==================== 工具函数 ====================
def softmax(x, axis=-1):
"""Softmax函数"""
exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
def gelu(x):
"""GELU激活函数"""
return 0.5 * x * (1 + np.tanh(np.sqrt(2/np.pi) * (x + 0.044715 * np.power(x, 3))))
def relu(x):
"""ReLU激活函数"""
return np.maximum(0, x)
def layer_norm(x, eps=1e-6):
"""层归一化"""
mean = np.mean(x, axis=-1, keepdims=True)
var = np.var(x, axis=-1, keepdims=True)
return (x - mean) / np.sqrt(var + eps)
def dropout(x, rate=0.1, training=True):
"""Dropout正则化"""
if not training:
return x
mask = np.random.binomial(1, 1-rate, size=x.shape)
return x * mask / (1 - rate)
# ==================== Transformer组件 ====================
class PositionalEncoding:
"""正弦位置编码"""
def __init__(self, d_model, max_len=5000):
pe = np.zeros((max_len, d_model))
position = np.arange(0, max_len).reshape(-1, 1)
div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
pe[:, 0::2] = np.sin(position * div_term)
pe[:, 1::2] = np.cos(position * div_term)
self.pe = pe
def forward(self, x):
"""添加位置编码"""
seq_len = x.shape[1]
return x + self.pe[:seq_len, :]
class MultiHeadAttention:
"""多头注意力机制(简化版)"""
def __init__(self, d_model, n_heads):
assert d_model % n_heads == 0
self.d_model = d_model
self.n_heads = n_heads
self.d_k = d_model // n_heads
# 初始化权重矩阵
self.W_Q = np.random.randn(d_model, d_model) * 0.02
self.W_K = np.random.randn(d_model, d_model) * 0.02
self.W_V = np.random.randn(d_model, d_model) * 0.02
self.W_O = np.random.randn(d_model, d_model) * 0.02
def __call__(self, Q, K, V, mask=None):
"""前向传播(简化版,单样本)"""
seq_len = Q.shape[0]
# 线性变换
Q = Q @ self.W_Q
K = K @ self.W_K
V = V @ self.W_V
# 分割多头(简化:平均分割)
Q_heads = np.split(Q, self.n_heads, axis=-1)
K_heads = np.split(K, self.n_heads, axis=-1)
V_heads = np.split(V, self.n_heads, axis=-1)
head_outputs = []
attn_weights_list = []
for h in range(self.n_heads):
# 缩放点积注意力
scores = Q_heads[h] @ K_heads[h].T / np.sqrt(self.d_k)
if mask is not None:
scores = np.where(mask, scores, -1e9)
attn_weights = softmax(scores, axis=-1)
attn_weights_list.append(attn_weights)
head_out = attn_weights @ V_heads[h]
head_outputs.append(head_out)
# 合并多头
concat = np.concatenate(head_outputs, axis=-1)
output = concat @ self.W_O
# 计算平均注意力权重用于可视化
avg_attn = np.mean(attn_weights_list, axis=0)
return output, avg_attn
class FeedForward:
"""前馈神经网络"""
def __init__(self, d_model, d_ff):
self.W1 = np.random.randn(d_model, d_ff) * 0.02
self.b1 = np.zeros(d_ff)
self.W2 = np.random.randn(d_ff, d_model) * 0.02
self.b2 = np.zeros(d_model)
def __call__(self, x):
"""前向传播"""
hidden = gelu(x @ self.W1 + self.b1)
return hidden @ self.W2 + self.b2
class TransformerEncoderLayer:
"""Transformer编码器层"""
def __init__(self, d_model, n_heads, d_ff, dropout_rate=0.1):
self.self_attn = MultiHeadAttention(d_model, n_heads)
self.feed_forward = FeedForward(d_model, d_ff)
self.dropout_rate = dropout_rate
def forward(self, x, mask=None, training=True):
# 自注意力子层
attn_output, attn_weights = self.self_attn(x, x, x, mask)
if training:
attn_output = dropout(attn_output, self.dropout_rate, training)
x = layer_norm(x + attn_output)
# 前馈子层
ff_output = self.feed_forward(x)
if training:
ff_output = dropout(ff_output, self.dropout_rate, training)
x = layer_norm(x + ff_output)
return x, attn_weights
class SimpleTransformer:
"""简化的Transformer模型用于时序预测"""
def __init__(self, input_dim, d_model=64, n_heads=4, n_layers=2, d_ff=256,
output_dim=1, dropout_rate=0.1):
self.input_dim = input_dim
self.d_model = d_model
self.n_heads = n_heads
self.n_layers = n_layers
self.output_dim = output_dim
# 输入投影
self.input_projection = np.random.randn(input_dim, d_model) * 0.02
self.input_bias = np.zeros(d_model)
# 位置编码
self.pos_encoding = PositionalEncoding(d_model)
# 编码器层
self.encoder_layers = [
TransformerEncoderLayer(d_model, n_heads, d_ff, dropout_rate)
for _ in range(n_layers)
]
# 输出投影
self.output_projection = np.random.randn(d_model, output_dim) * 0.02
self.output_bias = np.zeros(output_dim)
# 存储注意力权重用于可视化
self.attention_weights = []
def forward(self, x, mask=None, training=True):
"""
前向传播
x: [seq_len, input_dim]
"""
# 输入投影
x = x @ self.input_projection + self.input_bias
# 添加位置编码
x = self.pos_encoding.forward(x.reshape(1, -1, self.d_model)).reshape(-1, self.d_model)
# 编码器层
self.attention_weights = []
for layer in self.encoder_layers:
x, attn_weights = layer.forward(x, mask, training)
self.attention_weights.append(attn_weights)
# 输出投影(使用最后一个时间步)
output = x[-1:] @ self.output_projection + self.output_bias
return output
def predict_sequence(self, x, prediction_steps, mask=None):
"""多步预测"""
predictions = []
current_input = x.copy()
for _ in range(prediction_steps):
pred = self.forward(current_input, mask, training=False)
predictions.append(pred[0, 0])
# 滑动窗口更新
current_input = np.roll(current_input, -1, axis=0)
current_input[-1, 0] = pred[0, 0] # 假设第一维是目标变量
return np.array(predictions)
# ==================== 辐射换热时序数据生成 ====================
def generate_radiation_temperature_data(n_samples=1000, noise_level=0.02):
"""
生成辐射换热温度时序数据
模拟封闭腔体表面的温度演化
"""
t = np.linspace(0, 100, n_samples)
# 基础温度(稳态)
T_base = 350.0
# 周期性加热(模拟日夜循环)
T_daily = 50 * np.sin(2 * np.pi * t / 24)
# 快速波动(模拟热源开关)
T_rapid = 20 * np.sin(2 * np.pi * t / 2) * (np.sin(2 * np.pi * t / 10) > 0)
# 趋势项(缓慢漂移)
T_trend = 0.5 * t
# 随机噪声
noise = np.random.normal(0, noise_level * T_base, n_samples)
# 组合
T = T_base + T_daily + T_rapid + T_trend + noise
# 确保物理合理性
T = np.clip(T, 250, 600)
return T, t
def generate_multivariate_radiation_data(n_samples=1000):
"""
生成多变量辐射换热数据
包括:表面温度、环境温度、太阳辐射、风速
"""
t = np.linspace(0, 100, n_samples)
# 太阳辐射(日变化)
solar = np.maximum(0, 800 * np.sin(2 * np.pi * t / 24 - np.pi/2))
# 环境温度
T_ambient = 300 + 15 * np.sin(2 * np.pi * t / 24) + np.random.normal(0, 2, n_samples)
# 风速
wind = 5 + 3 * np.sin(2 * np.pi * t / 12) + np.random.normal(0, 1, n_samples)
wind = np.maximum(0, wind)
# 表面温度(受太阳辐射和环境温度影响)
T_surface = T_ambient + 0.05 * solar + np.random.normal(0, 1, n_samples)
# 辐射热流
epsilon = 0.8
sigma = 5.67e-8
q_rad = epsilon * sigma * (T_surface**4 - T_ambient**4)
data = np.column_stack([T_surface, T_ambient, solar, wind, q_rad])
return data, t
# ==================== 案例1:基础Transformer预测温度序列 ====================
def case1_basic_transformer():
"""案例1:基础Transformer预测温度序列"""
print("\n" + "="*70)
print("案例1:基础Transformer预测温度序列")
print("="*70)
# 生成数据
np.random.seed(42)
T_data, t_data = generate_radiation_temperature_data(n_samples=800)
# 数据归一化
T_mean = np.mean(T_data)
T_std = np.std(T_data)
T_normalized = (T_data - T_mean) / T_std
# 构造序列数据
seq_len = 20
pred_len = 10
X_train = []
y_train = []
for i in range(len(T_normalized) - seq_len - pred_len):
X_train.append(T_normalized[i:i+seq_len])
y_train.append(T_normalized[i+seq_len:i+seq_len+pred_len])
X_train = np.array(X_train)
y_train = np.array(y_train)
# 划分训练集和测试集
split_idx = int(0.8 * len(X_train))
X_train_split = X_train[:split_idx]
y_train_split = y_train[:split_idx]
X_test = X_train[split_idx:]
y_test = y_train[split_idx:]
print(f"\n数据集信息:")
print(f" 训练样本数: {len(X_train_split)}")
print(f" 测试样本数: {len(X_test)}")
print(f" 输入序列长度: {seq_len}")
print(f" 预测长度: {pred_len}")
# 创建Transformer模型
model = SimpleTransformer(
input_dim=1,
d_model=32,
n_heads=4,
n_layers=2,
d_ff=128,
output_dim=1,
dropout_rate=0.1
)
# 训练模型(简化版)
print("\n开始训练Transformer...")
n_epochs = 100
lr = 0.001
for epoch in range(n_epochs):
# 随机选择一个样本
idx = np.random.randint(0, len(X_train_split))
x = X_train_split[idx].reshape(-1, 1)
y_true = y_train_split[idx, 0] # 预测第一个时间步
# 前向传播
y_pred = model.forward(x, training=True)
# 简化的梯度下降(仅演示)
loss = np.mean((y_pred - y_true)**2)
if (epoch + 1) % 20 == 0:
print(f" Epoch {epoch+1}/{n_epochs}, Loss: {loss:.6f}")
# 测试预测
print("\n进行预测...")
test_idx = 0
x_test = X_test[test_idx].reshape(-1, 1)
y_true_test = y_test[test_idx]
# 多步预测
y_pred_test = model.predict_sequence(x_test, pred_len)
# 反归一化
x_test_denorm = x_test.flatten() * T_std + T_mean
y_true_denorm = y_true_test * T_std + T_mean
y_pred_denorm = y_pred_test * T_std + T_mean
# 计算误差
mae = np.mean(np.abs(y_pred_denorm - y_true_denorm))
rmse = np.sqrt(np.mean((y_pred_denorm - y_true_denorm)**2))
mape = np.mean(np.abs((y_pred_denorm - y_true_denorm) / y_true_denorm)) * 100
print(f"\n预测精度:")
print(f" MAE: {mae:.2f} K")
print(f" RMSE: {rmse:.2f} K")
print(f" MAPE: {mape:.2f}%")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('案例1:基础Transformer预测温度序列', fontsize=14, fontweight='bold')
# 子图1:完整时序数据
ax1 = axes[0, 0]
ax1.plot(t_data, T_data, 'b-', linewidth=1, alpha=0.7, label='实际温度')
ax1.set_xlabel('时间 (h)')
ax1.set_ylabel('温度 (K)')
ax1.set_title('辐射换热温度时序数据')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 子图2:预测结果对比
ax2 = axes[0, 1]
time_input = np.arange(seq_len)
time_pred = np.arange(seq_len, seq_len + pred_len)
ax2.plot(time_input, x_test_denorm, 'b-', linewidth=2, label='输入序列', marker='o')
ax2.plot(time_pred, y_true_denorm, 'g-', linewidth=2, label='真实值', marker='s')
ax2.plot(time_pred, y_pred_denorm, 'r--', linewidth=2, label='预测值', marker='^')
ax2.axvline(x=seq_len-0.5, color='k', linestyle=':', alpha=0.5)
ax2.set_xlabel('时间步')
ax2.set_ylabel('温度 (K)')
ax2.set_title(f'预测结果对比 (MAPE={mape:.1f}%)')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 子图3:预测误差
ax3 = axes[1, 0]
errors = y_pred_denorm - y_true_denorm
ax3.bar(time_pred, errors, color='coral', alpha=0.7)
ax3.axhline(y=0, color='k', linestyle='-', linewidth=0.5)
ax3.set_xlabel('时间步')
ax3.set_ylabel('预测误差 (K)')
ax3.set_title('预测误差分布')
ax3.grid(True, alpha=0.3)
# 子图4:注意力权重可视化
ax4 = axes[1, 1]
if len(model.attention_weights) > 0:
attn = model.attention_weights[-1] # 最后一层的注意力权重
im = ax4.imshow(attn, cmap='viridis', aspect='auto')
ax4.set_xlabel('键位置')
ax4.set_ylabel('查询位置')
ax4.set_title('注意力权重热力图')
plt.colorbar(im, ax=ax4)
plt.tight_layout()
plt.savefig('case1_basic_transformer.png', dpi=150, bbox_inches='tight')
print("\n结果已保存至: case1_basic_transformer.png")
plt.close()
# ==================== 案例2:多头注意力可视化 ====================
def case2_attention_visualization():
"""案例2:多头注意力可视化"""
print("\n" + "="*70)
print("案例2:多头注意力可视化")
print("="*70)
# 生成数据
np.random.seed(42)
T_data, t_data = generate_radiation_temperature_data(n_samples=500)
# 归一化
T_mean = np.mean(T_data)
T_std = np.std(T_data)
T_normalized = (T_data - T_mean) / T_std
# 创建带多头注意力的Transformer
d_model = 64
n_heads = 4
model = SimpleTransformer(
input_dim=1,
d_model=d_model,
n_heads=n_heads,
n_layers=2,
d_ff=256,
output_dim=1,
dropout_rate=0.1
)
# 选择一个序列进行前向传播
seq_len = 48 # 2天的数据
x = T_normalized[:seq_len].reshape(-1, 1)
# 前向传播获取注意力权重
_ = model.forward(x, training=False)
print(f"\n注意力分析:")
print(f" 序列长度: {seq_len}")
print(f" 注意力头数: {n_heads}")
print(f" 编码器层数: {model.n_layers}")
# 可视化
fig = plt.figure(figsize=(16, 12))
gs = GridSpec(3, 3, figure=fig, hspace=0.3, wspace=0.3)
fig.suptitle('案例2:多头注意力可视化', fontsize=14, fontweight='bold')
# 子图1:输入序列
ax1 = fig.add_subplot(gs[0, :])
hours = np.arange(seq_len) * (24 / seq_len * 2) # 2天
ax1.plot(hours, T_data[:seq_len], 'b-', linewidth=2)
ax1.set_xlabel('时间 (h)')
ax1.set_ylabel('温度 (K)')
ax1.set_title('输入温度序列 (2天数据)')
ax1.grid(True, alpha=0.3)
# 添加日标记
ax1.axvline(x=24, color='r', linestyle='--', alpha=0.5, label='日边界')
ax1.legend()
# 子图2-5:各层注意力权重
for layer_idx in range(model.n_layers):
attn = model.attention_weights[layer_idx]
ax = fig.add_subplot(gs[1, layer_idx])
im = ax.imshow(attn, cmap='hot', aspect='auto', interpolation='nearest')
ax.set_title(f'层 {layer_idx+1} 注意力权重')
ax.set_xlabel('键位置')
ax.set_ylabel('查询位置')
plt.colorbar(im, ax=ax, fraction=0.046)
# 分析注意力模式
diagonal_mean = np.mean(np.diag(attn))
print(f"\n 层 {layer_idx+1}:")
print(f" 对角线平均权重: {diagonal_mean:.4f} (局部关注)")
print(f" 最大权重: {np.max(attn):.4f}")
print(f" 最小权重: {np.min(attn):.4f}")
# 子图6:注意力统计
ax6 = fig.add_subplot(gs[1, 2])
layer_stats = []
for layer_idx in range(model.n_layers):
attn = model.attention_weights[layer_idx]
# 计算每行的熵(注意力分散度)
entropy = -np.sum(attn * np.log(attn + 1e-10), axis=1)
layer_stats.append(np.mean(entropy))
ax6.bar([f'层{i+1}' for i in range(model.n_layers)], layer_stats,
color=['skyblue', 'lightcoral'])
ax6.set_ylabel('平均注意力熵')
ax6.set_title('注意力分散度')
ax6.grid(True, alpha=0.3)
# 子图7-9:不同时间尺度的注意力模式
# 分析不同时间尺度的关注
time_scales = [
('短期 (1-2h)', 2),
('中期 (6-12h)', 12),
('长期 (24h+)', 24)
]
for idx, (scale_name, scale_len) in enumerate(time_scales):
ax = fig.add_subplot(gs[2, idx])
# 计算该时间尺度的平均注意力
attn = model.attention_weights[-1] # 使用最后一层
scale_attn = np.zeros((seq_len, seq_len))
for i in range(seq_len):
for j in range(seq_len):
if abs(i - j) <= scale_len:
scale_attn[i, j] = attn[i, j]
im = ax.imshow(scale_attn, cmap='Blues', aspect='auto')
ax.set_title(f'{scale_name}注意力')
ax.set_xlabel('键位置')
ax.set_ylabel('查询位置')
plt.colorbar(im, ax=ax, fraction=0.046)
plt.savefig('case2_attention_visualization.png', dpi=150, bbox_inches='tight')
print("\n结果已保存至: case2_attention_visualization.png")
plt.close()
# ==================== 案例3:多步 ahead 预测 ====================
def case3_multistep_prediction():
"""案例3:多步 ahead 预测"""
print("\n" + "="*70)
print("案例3:多步 ahead 预测")
print("="*70)
# 生成数据
np.random.seed(42)
T_data, t_data = generate_radiation_temperature_data(n_samples=1000)
# 归一化
T_mean = np.mean(T_data)
T_std = np.std(T_data)
T_normalized = (T_data - T_mean) / T_std
# 不同预测步长
prediction_horizons = [1, 5, 10, 20]
seq_len = 20
results = {}
for pred_len in prediction_horizons:
print(f"\n 预测步长: {pred_len}")
# 构造数据
X = []
y = []
for i in range(len(T_normalized) - seq_len - pred_len):
X.append(T_normalized[i:i+seq_len])
y.append(T_normalized[i+seq_len:i+seq_len+pred_len])
X = np.array(X)
y = np.array(y)
# 划分数据集
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# 创建模型
model = SimpleTransformer(
input_dim=1,
d_model=32,
n_heads=4,
n_layers=2,
d_ff=128,
output_dim=1
)
# 简化的训练
for _ in range(50):
idx = np.random.randint(0, len(X_train))
x = X_train[idx].reshape(-1, 1)
y_true = y_train[idx, 0]
_ = model.forward(x, training=True)
# 测试
test_errors = []
for i in range(min(50, len(X_test))):
x_test = X_test[i].reshape(-1, 1)
y_true = y_test[i] * T_std + T_mean
y_pred = model.predict_sequence(x_test, pred_len) * T_std + T_mean
errors = np.abs(y_pred - y_true)
test_errors.append(errors)
test_errors = np.array(test_errors)
mean_errors = np.mean(test_errors, axis=0)
results[pred_len] = {
'mean_errors': mean_errors,
'mae': np.mean(mean_errors),
'rmse': np.sqrt(np.mean(mean_errors**2))
}
print(f" MAE: {results[pred_len]['mae']:.2f} K")
print(f" RMSE: {results[pred_len]['rmse']:.2f} K")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('案例3:多步 ahead 预测性能分析', fontsize=14, fontweight='bold')
# 子图1:不同预测步长的误差累积
ax1 = axes[0, 0]
colors = ['blue', 'green', 'orange', 'red']
for idx, (pred_len, color) in enumerate(zip(prediction_horizons, colors)):
steps = np.arange(1, pred_len + 1)
ax1.plot(steps, results[pred_len]['mean_errors'],
marker='o', linewidth=2, label=f'{pred_len}-step', color=color)
ax1.set_xlabel('预测步数')
ax1.set_ylabel('平均绝对误差 (K)')
ax1.set_title('误差随预测步长累积')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 子图2:MAE对比
ax2 = axes[0, 1]
maes = [results[p]['mae'] for p in prediction_horizons]
bars = ax2.bar([f'{p}-step' for p in prediction_horizons], maes,
color=colors, alpha=0.7)
ax2.set_ylabel('MAE (K)')
ax2.set_title('不同预测步长的MAE对比')
ax2.grid(True, alpha=0.3, axis='y')
# 在柱状图上添加数值
for bar, mae in zip(bars, maes):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{mae:.1f}', ha='center', va='bottom')
# 子图3:具体预测案例(10-step)
ax3 = axes[1, 0]
test_idx = 10
x_test = X_test[test_idx].reshape(-1, 1)
y_true = y_test[test_idx] * T_std + T_mean
# 使用10-step模型
model_10 = SimpleTransformer(input_dim=1, d_model=32, n_heads=4, n_layers=2, d_ff=128)
for _ in range(50):
idx = np.random.randint(0, len(X_train))
x = X_train[idx].reshape(-1, 1)
_ = model_10.forward(x, training=True)
y_pred_10 = model_10.predict_sequence(x_test, 10) * T_std + T_mean
time_input = np.arange(seq_len)
time_pred = np.arange(seq_len, seq_len + 10)
ax3.plot(time_input, x_test.flatten() * T_std + T_mean,
'b-', linewidth=2, label='输入', marker='o')
ax3.plot(time_pred, y_true[:10], 'g-', linewidth=2, label='真实', marker='s')
ax3.plot(time_pred, y_pred_10, 'r--', linewidth=2, label='预测', marker='^')
ax3.axvline(x=seq_len-0.5, color='k', linestyle=':', alpha=0.5)
ax3.set_xlabel('时间步')
ax3.set_ylabel('温度 (K)')
ax3.set_title('10-step预测示例')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 子图4:误差分布直方图
ax4 = axes[1, 1]
all_errors = []
for pred_len in prediction_horizons:
all_errors.extend(results[pred_len]['mean_errors'])
ax4.hist(all_errors, bins=20, color='skyblue', alpha=0.7, edgecolor='black')
ax4.axvline(x=np.mean(all_errors), color='r', linestyle='--',
linewidth=2, label=f'均值: {np.mean(all_errors):.2f} K')
ax4.set_xlabel('绝对误差 (K)')
ax4.set_ylabel('频数')
ax4.set_title('预测误差分布')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('case3_multistep_prediction.png', dpi=150, bbox_inches='tight')
print("\n结果已保存至: case3_multistep_prediction.png")
plt.close()
# ==================== 案例4:物理约束Transformer ====================
def case4_physics_constrained_transformer():
"""案例4:物理约束Transformer"""
print("\n" + "="*70)
print("案例4:物理约束Transformer")
print("="*70)
# 生成带物理约束的数据
np.random.seed(42)
# 模拟加热炉温度变化
n_samples = 600
t = np.linspace(0, 60, n_samples) # 60小时
# 加热阶段
heating = t < 20
holding = (t >= 20) & (t < 40)
cooling = t >= 40
T = np.zeros(n_samples)
T[0] = 300 # 初始温度
# 物理参数
dt = t[1] - t[0]
thermal_inertia = 1000 # 热惯性 (J/K)
max_heating_power = 2000 # 最大加热功率 (W)
cooling_coeff = 0.05 # 冷却系数
for i in range(1, n_samples):
if heating[i]:
# 加热阶段:温度上升(限制温升)
dT = max_heating_power / thermal_inertia * dt * 3600
dT = min(dT, 5) # 每步最大升温5K
elif holding[i]:
# 保温阶段:小幅波动
dT = np.random.normal(0, 0.5)
else:
# 冷却阶段:自然冷却
dT = -cooling_coeff * (T[i-1] - 300) * dt * 3600
dT = max(dT, -3) # 每步最大降温3K
T[i] = T[i-1] + dT
T[i] += np.random.normal(0, 1) # 添加噪声
T[i] = np.clip(T[i], 300, 800) # 限制温度范围
print(f"\n物理场景:加热炉温度控制")
print(f" 加热阶段: 0-20h")
print(f" 保温阶段: 20-40h")
print(f" 冷却阶段: 40-60h")
print(f" 最高温度: {np.max(T):.1f} K")
# 归一化
T_mean = np.mean(T)
T_std = np.std(T)
T_normalized = (T - T_mean) / T_std
# 构造序列
seq_len = 24
pred_len = 12
X = []
y = []
for i in range(len(T_normalized) - seq_len - pred_len):
X.append(T_normalized[i:i+seq_len])
y.append(T_normalized[i+seq_len:i+seq_len+pred_len])
X = np.array(X)
y = np.array(y)
# 创建带物理约束的模型
class PhysicsConstrainedTransformer(SimpleTransformer):
"""物理约束Transformer"""
def __init__(self, *args, thermal_inertia=1000, max_power=2000, **kwargs):
super().__init__(*args, **kwargs)
self.thermal_inertia = thermal_inertia
self.max_power = max_power
def physics_constrained_predict(self, x, prediction_steps, T_std, T_mean):
"""带物理约束的预测"""
predictions = []
current_input = x.copy()
for _ in range(prediction_steps):
# 模型预测
pred = self.forward(current_input, training=False)
pred_denorm = pred[0, 0] * T_std + T_mean
# 物理约束:温度变化率限制
last_temp = current_input[-1, 0] * T_std + T_mean
dT = pred_denorm - last_temp
dt_hours = 1 # 假设每小时一个时间步
# 最大允许温升
max_dT = (self.max_power / self.thermal_inertia) * dt_hours * 3600
# 应用约束
if dT > max_dT:
dT = max_dT
elif dT < -max_dT * 0.5: # 冷却可以快一些
dT = -max_dT * 0.5
constrained_T = last_temp + dT
predictions.append(constrained_T)
# 更新输入
current_input = np.roll(current_input, -1, axis=0)
current_input[-1, 0] = (constrained_T - T_mean) / T_std
return np.array(predictions)
# 创建模型
model = PhysicsConstrainedTransformer(
input_dim=1,
d_model=32,
n_heads=4,
n_layers=2,
d_ff=128,
thermal_inertia=thermal_inertia,
max_power=max_heating_power
)
# 训练
for _ in range(100):
idx = np.random.randint(0, len(X))
x = X[idx].reshape(-1, 1)
_ = model.forward(x, training=True)
# 测试
test_idx = 300
x_test = X[test_idx].reshape(-1, 1)
y_true = y[test_idx] * T_std + T_mean
# 无约束预测
y_pred_unconstrained = model.predict_sequence(x_test, pred_len) * T_std + T_mean
# 有约束预测
y_pred_constrained = model.physics_constrained_predict(
x_test, pred_len, T_std, T_mean
)
# 计算误差
mae_unconstrained = np.mean(np.abs(y_pred_unconstrained - y_true))
mae_constrained = np.mean(np.abs(y_pred_constrained - y_true))
# 检查物理约束违反
def check_physics_violation(temps, max_dT=7.2):
"""检查温度变化率是否违反物理约束"""
violations = 0
for i in range(1, len(temps)):
if abs(temps[i] - temps[i-1]) > max_dT:
violations += 1
return violations
violations_unconstrained = check_physics_violation(y_pred_unconstrained)
violations_constrained = check_physics_violation(y_pred_constrained)
print(f"\n预测结果对比:")
print(f" 无约束MAE: {mae_unconstrained:.2f} K")
print(f" 有约束MAE: {mae_constrained:.2f} K")
print(f" 无约束物理违反: {violations_unconstrained} 次")
print(f" 有约束物理违反: {violations_constrained} 次")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('案例4:物理约束Transformer', fontsize=14, fontweight='bold')
# 子图1:完整温度曲线
ax1 = axes[0, 0]
ax1.plot(t, T, 'b-', linewidth=1.5, label='实际温度')
ax1.axvspan(0, 20, alpha=0.2, color='red', label='加热阶段')
ax1.axvspan(20, 40, alpha=0.2, color='green', label='保温阶段')
ax1.axvspan(40, 60, alpha=0.2, color='blue', label='冷却阶段')
ax1.set_xlabel('时间 (h)')
ax1.set_ylabel('温度 (K)')
ax1.set_title('加热炉温度曲线')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 子图2:预测对比
ax2 = axes[0, 1]
time_input = np.arange(seq_len)
time_pred = np.arange(seq_len, seq_len + pred_len)
x_test_denorm = x_test.flatten() * T_std + T_mean
ax2.plot(time_input, x_test_denorm, 'b-', linewidth=2, label='输入', marker='o')
ax2.plot(time_pred, y_true, 'g-', linewidth=2, label='真实', marker='s')
ax2.plot(time_pred, y_pred_unconstrained, 'r--', linewidth=2,
label='无约束预测', marker='^')
ax2.plot(time_pred, y_pred_constrained, 'm:', linewidth=2,
label='有约束预测', marker='d')
ax2.axvline(x=seq_len-0.5, color='k', linestyle=':', alpha=0.5)
ax2.set_xlabel('时间步')
ax2.set_ylabel('温度 (K)')
ax2.set_title('预测结果对比')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 子图3:温度变化率
ax3 = axes[1, 0]
dT_unconstrained = np.diff(y_pred_unconstrained)
dT_constrained = np.diff(y_pred_constrained)
dT_true = np.diff(y_true)
time_dT = time_pred[1:]
ax3.plot(time_dT, dT_true, 'g-', linewidth=2, label='真实', marker='s')
ax3.plot(time_dT, dT_unconstrained, 'r--', linewidth=2,
label='无约束', marker='^')
ax3.plot(time_dT, dT_constrained, 'm:', linewidth=2,
label='有约束', marker='d')
ax3.axhline(y=7.2, color='r', linestyle='--', alpha=0.5, label='物理上限')
ax3.axhline(y=-3.6, color='r', linestyle='--', alpha=0.5, label='物理下限')
ax3.set_xlabel('时间步')
ax3.set_ylabel('温度变化率 (K/step)')
ax3.set_title('温度变化率对比')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 子图4:物理约束违反统计
ax4 = axes[1, 1]
methods = ['无约束', '有约束']
violations = [violations_unconstrained, violations_constrained]
maes = [mae_unconstrained, mae_constrained]
x_pos = np.arange(len(methods))
width = 0.35
bars1 = ax4.bar(x_pos - width/2, violations, width, label='物理违反次数',
color='coral', alpha=0.7)
ax4_twin = ax4.twinx()
bars2 = ax4_twin.bar(x_pos + width/2, maes, width, label='MAE (K)',
color='skyblue', alpha=0.7)
ax4.set_ylabel('物理违反次数', color='coral')
ax4_twin.set_ylabel('MAE (K)', color='skyblue')
ax4.set_xticks(x_pos)
ax4.set_xticklabels(methods)
ax4.set_title('约束效果对比')
# 添加数值标签
for bar in bars1:
height = bar.get_height()
ax4.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}', ha='center', va='bottom')
for bar in bars2:
height = bar.get_height()
ax4_twin.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.1f}', ha='center', va='bottom')
plt.tight_layout()
plt.savefig('case4_physics_constrained.png', dpi=150, bbox_inches='tight')
print("\n结果已保存至: case4_physics_constrained.png")
plt.close()
# ==================== 案例5:Transformer与传统方法对比 ====================
def case5_comparison_with_traditional():
"""案例5:Transformer与传统方法对比"""
print("\n" + "="*70)
print("案例5:Transformer与传统方法对比")
print("="*70)
# 生成数据
np.random.seed(42)
T_data, t_data = generate_radiation_temperature_data(n_samples=800)
# 归一化
T_mean = np.mean(T_data)
T_std = np.std(T_data)
T_normalized = (T_data - T_mean) / T_std
# 构造数据
seq_len = 20
pred_len = 5
X = []
y = []
for i in range(len(T_normalized) - seq_len - pred_len):
X.append(T_normalized[i:i+seq_len])
y.append(T_normalized[i+seq_len])
X = np.array(X)
y = np.array(y)
# 划分数据集
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
print(f"\n数据集信息:")
print(f" 训练样本: {len(X_train)}")
print(f" 测试样本: {len(X_test)}")
results = {}
# 方法1:简单移动平均(基线)
print("\n 方法1: 移动平均 (MA)")
y_pred_ma = []
for x in X_test:
pred = np.mean(x[-5:]) # 最后5个点的平均
y_pred_ma.append(pred)
y_pred_ma = np.array(y_pred_ma)
mae_ma = np.mean(np.abs(y_pred_ma - y_test))
rmse_ma = np.sqrt(np.mean((y_pred_ma - y_test)**2))
results['MA'] = {'mae': mae_ma, 'rmse': rmse_ma, 'pred': y_pred_ma}
print(f" MAE: {mae_ma:.4f}, RMSE: {rmse_ma:.4f}")
# 方法2:线性回归
print("\n 方法2: 线性回归 (LR)")
from numpy.polynomial import polynomial as P
y_pred_lr = []
for x in X_test:
# 拟合线性趋势
t = np.arange(len(x))
coeffs = P.polyfit(t, x, 1)
# 外推
pred = P.polyval(len(x), coeffs)
y_pred_lr.append(pred)
y_pred_lr = np.array(y_pred_lr)
mae_lr = np.mean(np.abs(y_pred_lr - y_test))
rmse_lr = np.sqrt(np.mean((y_pred_lr - y_test)**2))
results['LR'] = {'mae': mae_lr, 'rmse': rmse_lr, 'pred': y_pred_lr}
print(f" MAE: {mae_lr:.4f}, RMSE: {rmse_lr:.4f}")
# 方法3:简单RNN(简化版)
print("\n 方法3: 简单RNN")
class SimpleRNN:
def __init__(self, input_size, hidden_size, output_size):
self.hidden_size = hidden_size
self.Wxh = np.random.randn(input_size, hidden_size) * 0.01
self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
self.Why = np.random.randn(hidden_size, output_size) * 0.01
self.bh = np.zeros(hidden_size)
self.by = np.zeros(output_size)
def forward(self, x):
h = np.zeros(self.hidden_size)
for xi in x:
h = np.tanh(xi * self.Wxh + h @ self.Whh + self.bh)
y = h @ self.Why + self.by
return y
rnn = SimpleRNN(1, 32, 1)
# 简化的训练
for _ in range(100):
idx = np.random.randint(0, len(X_train))
x = X_train[idx].reshape(-1, 1)
y_true = y_train[idx]
_ = rnn.forward(x)
y_pred_rnn = []
for x in X_test:
pred = rnn.forward(x.reshape(-1, 1))
y_pred_rnn.append(pred[0])
y_pred_rnn = np.array(y_pred_rnn)
mae_rnn = np.mean(np.abs(y_pred_rnn - y_test))
rmse_rnn = np.sqrt(np.mean((y_pred_rnn - y_test)**2))
results['RNN'] = {'mae': mae_rnn, 'rmse': rmse_rnn, 'pred': y_pred_rnn}
print(f" MAE: {mae_rnn:.4f}, RMSE: {rmse_rnn:.4f}")
# 方法4:Transformer
print("\n 方法4: Transformer")
transformer = SimpleTransformer(
input_dim=1,
d_model=32,
n_heads=4,
n_layers=2,
d_ff=128,
output_dim=1
)
# 训练
for _ in range(100):
idx = np.random.randint(0, len(X_train))
x = X_train[idx].reshape(-1, 1)
_ = transformer.forward(x, training=True)
y_pred_transformer = []
for x in X_test:
pred = transformer.forward(x.reshape(-1, 1), training=False)
y_pred_transformer.append(pred[0, 0])
y_pred_transformer = np.array(y_pred_transformer)
mae_transformer = np.mean(np.abs(y_pred_transformer - y_test))
rmse_transformer = np.sqrt(np.mean((y_pred_transformer - y_test)**2))
results['Transformer'] = {
'mae': mae_transformer,
'rmse': rmse_transformer,
'pred': y_pred_transformer
}
print(f" MAE: {mae_transformer:.4f}, RMSE: {rmse_transformer:.4f}")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('案例5:Transformer与传统方法对比', fontsize=14, fontweight='bold')
# 子图1:MAE对比
ax1 = axes[0, 0]
methods = list(results.keys())
maes = [results[m]['mae'] for m in methods]
colors = ['skyblue', 'lightgreen', 'orange', 'coral']
bars = ax1.bar(methods, maes, color=colors, alpha=0.7)
ax1.set_ylabel('MAE (归一化)')
ax1.set_title('预测精度对比 (MAE)')
ax1.grid(True, alpha=0.3, axis='y')
for bar, mae in zip(bars, maes):
height = bar.get_height()
ax1.text(bar.get_x() + bar.get_width()/2., height,
f'{mae:.4f}', ha='center', va='bottom')
# 子图2:RMSE对比
ax2 = axes[0, 1]
rmses = [results[m]['rmse'] for m in methods]
bars = ax2.bar(methods, rmses, color=colors, alpha=0.7)
ax2.set_ylabel('RMSE (归一化)')
ax2.set_title('预测精度对比 (RMSE)')
ax2.grid(True, alpha=0.3, axis='y')
for bar, rmse in zip(bars, rmses):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{rmse:.4f}', ha='center', va='bottom')
# 子图3:预测结果对比(部分样本)
ax3 = axes[1, 0]
n_show = 50
idx_show = slice(0, n_show)
ax3.plot(y_test[idx_show], 'k-', linewidth=2, label='真实值', alpha=0.8)
for method, color in zip(methods, colors):
ax3.plot(results[method]['pred'][idx_show], '--',
linewidth=1.5, label=method, alpha=0.7)
ax3.set_xlabel('样本索引')
ax3.set_ylabel('归一化温度')
ax3.set_title(f'预测结果对比 (前{n_show}个样本)')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 子图4:残差分布
ax4 = axes[1, 1]
for method, color in zip(methods, colors):
residuals = results[method]['pred'].flatten() - y_test.flatten()
ax4.hist(residuals, bins=20, alpha=0.5, label=method, color=color)
ax4.axvline(x=0, color='k', linestyle='--', linewidth=1)
ax4.set_xlabel('残差')
ax4.set_ylabel('频数')
ax4.set_title('残差分布')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('case5_comparison.png', dpi=150, bbox_inches='tight')
print("\n结果已保存至: case5_comparison.png")
plt.close()
# ==================== 案例6:实时辐射换热预测系统 ====================
def case6_realtime_prediction():
"""案例6:实时辐射换热预测系统"""
print("\n" + "="*70)
print("案例6:实时辐射换热预测系统")
print("="*70)
# 生成实时数据流
np.random.seed(42)
n_total = 500
T_data, t_data = generate_radiation_temperature_data(n_samples=n_total)
# 系统参数
window_size = 30 # 观察窗口
prediction_horizon = 10 # 预测窗口
update_interval = 5 # 更新间隔
print(f"\n实时预测系统参数:")
print(f" 观察窗口: {window_size} 个时间步")
print(f" 预测窗口: {prediction_horizon} 个时间步")
print(f" 更新间隔: {update_interval} 个时间步")
print(f" 总时间步: {n_total}")
# 创建模型
model = SimpleTransformer(
input_dim=1,
d_model=32,
n_heads=4,
n_layers=2,
d_ff=128,
output_dim=1
)
# 预训练(使用历史数据)
print("\n预训练模型...")
T_mean = np.mean(T_data[:200])
T_std = np.std(T_data[:200])
T_normalized = (T_data - T_mean) / T_std
for _ in range(100):
idx = np.random.randint(0, 150)
x = T_normalized[idx:idx+window_size].reshape(-1, 1)
_ = model.forward(x, training=True)
# 模拟实时预测
print("\n模拟实时预测...")
predictions_history = []
actual_history = []
timestamps = []
errors_history = []
for t in range(window_size, n_total - prediction_horizon, update_interval):
# 获取观察窗口数据
window_data = T_normalized[t-window_size:t]
# 进行预测
x_input = window_data.reshape(-1, 1)
pred_normalized = model.predict_sequence(x_input, prediction_horizon)
# 反归一化
pred = pred_normalized * T_std + T_mean
actual = T_data[t:t+prediction_horizon]
# 记录结果
predictions_history.append(pred)
actual_history.append(actual)
timestamps.append(t)
# 计算误差
error = np.mean(np.abs(pred - actual))
errors_history.append(error)
if len(timestamps) % 10 == 0:
print(f" 时间步 {t}: MAE = {error:.2f} K")
# 计算统计指标
mean_error = np.mean(errors_history)
max_error = np.max(errors_history)
print(f"\n实时预测统计:")
print(f" 预测次数: {len(predictions_history)}")
print(f" 平均MAE: {mean_error:.2f} K")
print(f" 最大MAE: {max_error:.2f} K")
# 可视化
fig = plt.figure(figsize=(16, 12))
gs = GridSpec(3, 2, figure=fig, hspace=0.3, wspace=0.3)
fig.suptitle('案例6:实时辐射换热预测系统', fontsize=14, fontweight='bold')
# 子图1:完整时序与预测点
ax1 = fig.add_subplot(gs[0, :])
ax1.plot(t_data, T_data, 'b-', linewidth=1, alpha=0.5, label='实际数据')
# 标记预测点
for i, t in enumerate(timestamps[::5]): # 每5个显示一个
if i < len(predictions_history):
pred_start = t
pred_end = t + prediction_horizon
pred_time = t_data[pred_start:pred_end]
pred_vals = predictions_history[i*5]
ax1.plot(pred_time, pred_vals, 'r--', linewidth=1, alpha=0.6)
ax1.scatter([t_data[t]], [T_data[t]], c='green', s=20, zorder=5)
ax1.set_xlabel('时间 (h)')
ax1.set_ylabel('温度 (K)')
ax1.set_title('实时预测轨迹')
ax1.legend(['实际数据', '预测轨迹', '预测起点'])
ax1.grid(True, alpha=0.3)
# 子图2:特定时刻的预测示例
ax2 = fig.add_subplot(gs[1, 0])
example_idx = len(timestamps) // 2
t_example = timestamps[example_idx]
# 显示观察窗口
obs_start = t_example - window_size
obs_end = t_example
obs_time = t_data[obs_start:obs_end]
obs_data = T_data[obs_start:obs_end]
# 显示预测
pred_start = t_example
pred_end = t_example + prediction_horizon
pred_time = t_data[pred_start:pred_end]
pred_data = predictions_history[example_idx]
actual_data = actual_history[example_idx]
ax2.plot(obs_time, obs_data, 'b-', linewidth=2, label='观察窗口')
ax2.plot(pred_time, actual_data, 'g-', linewidth=2, label='真实值', marker='s')
ax2.plot(pred_time, pred_data, 'r--', linewidth=2, label='预测值', marker='^')
ax2.axvline(x=t_data[t_example], color='k', linestyle=':', alpha=0.5)
ax2.set_xlabel('时间 (h)')
ax2.set_ylabel('温度 (K)')
ax2.set_title(f'预测示例 (t={t_data[t_example]:.1f}h)')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 子图3:误差随时间变化
ax3 = fig.add_subplot(gs[1, 1])
error_time = [t_data[t] for t in timestamps]
ax3.plot(error_time, errors_history, 'b-', linewidth=1.5)
ax3.axhline(y=mean_error, color='r', linestyle='--',
linewidth=2, label=f'平均误差: {mean_error:.2f} K')
ax3.fill_between(error_time, 0, errors_history, alpha=0.3)
ax3.set_xlabel('时间 (h)')
ax3.set_ylabel('MAE (K)')
ax3.set_title('预测误差随时间变化')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 子图4:误差分布
ax4 = fig.add_subplot(gs[2, 0])
ax4.hist(errors_history, bins=20, color='skyblue', alpha=0.7, edgecolor='black')
ax4.axvline(x=mean_error, color='r', linestyle='--',
linewidth=2, label=f'均值: {mean_error:.2f} K')
ax4.set_xlabel('MAE (K)')
ax4.set_ylabel('频数')
ax4.set_title('预测误差分布')
ax4.legend()
ax4.grid(True, alpha=0.3)
# 子图5:预测性能指标
ax5 = fig.add_subplot(gs[2, 1])
metrics = ['平均MAE', '最大MAE', '预测次数']
values = [mean_error, max_error, len(predictions_history)]
colors_metrics = ['green', 'red', 'blue']
# 归一化显示
values_norm = [v/max(values) for v in values]
bars = ax5.bar(metrics, values_norm, color=colors_metrics, alpha=0.7)
# 添加实际数值
for bar, val in zip(bars, values):
height = bar.get_height()
ax5.text(bar.get_x() + bar.get_width()/2., height,
f'{val:.1f}' if isinstance(val, float) else f'{int(val)}',
ha='center', va='bottom')
ax5.set_ylabel('归一化值')
ax5.set_title('系统性能指标')
ax5.grid(True, alpha=0.3, axis='y')
plt.savefig('case6_realtime_prediction.png', dpi=150, bbox_inches='tight')
print("\n结果已保存至: case6_realtime_prediction.png")
plt.close()
# ==================== 主函数 ====================
def main():
"""主函数:运行所有案例"""
print("\n" + "="*70)
print("主题086:Transformer模型在辐射序列预测中的应用 - Python仿真")
print("="*70)
# 运行6个案例
case1_basic_transformer()
case2_attention_visualization()
case3_multistep_prediction()
case4_physics_constrained_transformer()
case5_comparison_with_traditional()
case6_realtime_prediction()
print("\n" + "="*70)
print("所有仿真案例已完成!")
print("="*70)
print("\n生成的文件:")
print(" 1. case1_basic_transformer.png - 基础Transformer预测")
print(" 2. case2_attention_visualization.png - 注意力可视化")
print(" 3. case3_multistep_prediction.png - 多步预测")
print(" 4. case4_physics_constrained.png - 物理约束Transformer")
print(" 5. case5_comparison.png - 方法对比")
print(" 6. case6_realtime_prediction.png - 实时预测系统")
if __name__ == "__main__":
main()
附录:关键公式汇总
Transformer基础
缩放点积注意力:
Attention(Q,K,V)=softmax(QKTdk)V\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)VAttention(Q,K,V)=softmax(dkQKT)V
多头注意力:
MultiHead(Q,K,V)=Concat(head1,...,headh)WO\text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, ..., \text{head}_h)W^OMultiHead(Q,K,V)=Concat(head1,...,headh)WO
位置编码:
PE(pos,2i)=sin(pos100002i/dmodel)PE_{(pos, 2i)} = \sin\left(\frac{pos}{10000^{2i/d_{model}}}\right)PE(pos,2i)=sin(100002i/dmodelpos)
辐射换热基础
瞬态辐射平衡:
ρcp∂T∂t=−∇⋅qrad+Qsource\rho c_p \frac{\partial T}{\partial t} = -\nabla \cdot \mathbf{q}_{rad} + Q_{source}ρcp∂t∂T=−∇⋅qrad+Qsource
辐射热流:
qrad=εσ(T4−Tsur4)q_{rad} = \varepsilon \sigma (T^4 - T_{sur}^4)qrad=εσ(T4−Tsur4)
损失函数
组合损失:
L=λ1LMSE+λ2LMAE+λ3Lphysics\mathcal{L} = \lambda_1 \mathcal{L}_{MSE} + \lambda_2 \mathcal{L}_{MAE} + \lambda_3 \mathcal{L}_{physics}L=λ1LMSE+λ2LMAE+λ3Lphysics
本主题由仿真模拟专家编写,旨在帮助读者深入理解Transformer模型在辐射换热时序预测中的应用。建议读者结合Python代码进行实践,加深对理论的理解。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)