Tensorflow实现LSTM详解
关于什么是 LSTM 我就不详细阐述了,吴恩达老师视频课里面讲的很好,我大概记录了课上的内容在吴恩达《序列模型》笔记一,网上也有很多写的好的解释,比如:LSTM入门、理解LSTM网络
然而,理解挺简单,上手写的时候还是遇到了很多的问题,网上大部分的博客都没有讲清楚 cell 参数的设置,在我看了N多篇文章后终于搞明白了,写出来让大家少走一些弯路吧!
如上图是一个LSTM的单元,可以应用到多种RNN结构中,常用的应该是 one-to-many 和 many-to-many
下面介绍 many-to-many 这种结构:
- batch_size:批度训练大小,即让 batch_size 个句子同时训练。
- time_steps:时间长度,即句子的长度
- embedding_size:组成句子的单词的向量长度(embedding size)
- hidden_size:隐藏单元数,一个LSTM结构是一个神经网络(如上图就是一个LSTM单元),每个小黄框是一个神经网络,小黄框的隐藏单元数就是hidden_size,那么这个LSTM单元就有 4*hidden_size 个隐藏单元。
- 每个LSTM单元的输出 C、h,都是向量,他们的长度都是当前 LSTM 单元的 hidden_size。
- n_words:语料库中单词个数。
实现方式一:
import tensorflow as tf
import numpy as np
from tensorflow.contrib import rnn
def add_layer(inputs, in_size, out_size, activation_function=None): # 单层神经网络
weights = tf.Variable(tf.random_normal([in_size, out_size]))
baises = tf.Variable(tf.zeros([1, out_size]) + 0.1)
wx_b = tf.matmul(inputs, weights) + baises
if activation_function is None:
outputs = wx_b
else:
outputs = activation_function(wx_b)
return outputs
n_words = 15
embedding_size = 8
hidden_size = 8 # 一般hidden_size和embedding_size是相同的
batch_size = 3
time_steps = 5
w = tf.Variable(tf.random_normal([n_words, embedding_size], stddev=0.01)) # 模拟参数 W
sentence = tf.Variable(np.arange(15).reshape(batch_size, time_step, 1)) # 模拟训练的句子:3条句子,每个句子5个单词 shape(3,5,1)
input_s = tf.nn.embedding_lookup(w, sentence) # 将单词映射到向量:每个单词变成了size为8的向量 shape=(3,5,1,8)
input_s = tf.reshape(input_s, [-1, 5, 8]) # shape(3,5,8)
with tf.name_scope("LSTM"): # trust
lstm_cell = rnn.BasicLSTMCell(hidden_size, state_is_tuple=True, name='lstm_layer')
h_0 = tf.zeros([batch_size, embedding_size]) # shape=(3,8)
c_0 = tf.zeros([batch_size, embedding_size]) # shape=(3,8)
state = rnn.LSTMStateTuple(c=c_0, h=h_0) # 设置初始状态
outputs = []
for i in range(time_steps): # 句子长度
if i > 0: tf.get_variable_scope().reuse_variables() # 名字相同cell使用的参数w就一样,为了避免重名引起别的的问题,设置一下变量重用
output, state = lstm_cell(input_s[:, i, :], state) # output:[batch_size,embedding_size] shape=(3,8)
outputs.append(output) # outputs:[TIME_STEP,batch_size,embedding_size] shape=(5,3,8)
path = tf.concat(outputs, 1) # path:[batch_size,embedding_size*TIME_STEP] shape=(3, 40)
path_embedding = add_layer(path, time_step * embedding_size, embedding_size) # path_embedding:[batch_size, embedding_size]
with tf.Session() as s:
s.run(tf.global_variables_initializer())
# 因为使用的参数数量都还比较小,打印一些变量看看就能明白是怎么操作的
print(s.run(outputs))
print(s.run(path_embedding))
比如一批训练64句话,每句话20个单词,每个词向量长度为200,隐藏层单元个数为128
那么训练一批句子,输入的张量维度是[64,20,200],ht,ct 的维度是[128],那么LSTM单元参数矩阵的维度是[128+200,4x128],
在时刻1,把64句话的第一个单词作为输入,即输入一个[64,200]的矩阵,由于会和 ht 进行concat,输入矩阵变成了[64,200+128],输入矩阵会和参数矩阵[200+128,4x128]相乘,输出为[64,4x128],也就是每个黄框的输出为[64,128],黄框之间会进行一些操作,但不改变维度,输出依旧是[64,128],即每个句子经过LSTM单元后,输出的维度是128,所以每个LSTM输出的都是向量,包括Ct,ht,所以它们的长度都是当前LSTM单元的hidden_size 。那么我们就知道cell_output的维度为[64,128]
之后的时刻重复刚才同样的操作,那么outputs的维度是[20,64,128].
softmax相当于全连接层,将outputs映射到vocab_size个单词上,进行交叉熵误差计算。
然后根据误差更新LSTM参数矩阵和全连接层的参数。
实现方式二:
测试数据链接:https://pan.baidu.com/s/1j9sgPmWUHM5boM5ekj3Q2w 提取码:go3f
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
data = pd.read_excel("seq_data.xlsx") # 读取序列数据
data = data.values[1:800] # 取前800个
normalize_data = (data - np.mean(data)) / np.std(data) # 标准化数据
s = np.std(data)
m = np.mean(data)
time_step = 96 # 序列段长度
rnn_unit = 8 # 隐藏层节点数目
lstm_layers = 2 # cell层数
batch_size = 7 # 序列段批处理数目
input_size = 1 # 输入维度
output_size = 1 # 输出维度
lr = 0.006 # 学习率
train_x, train_y = [], []
for i in range(len(data) - time_step - 1):
x = normalize_data[i:i + time_step]
y = normalize_data[i + 1:i + time_step + 1]
train_x.append(x.tolist())
train_y.append(y.tolist())
X = tf.placeholder(tf.float32, [None, time_step, input_size]) # shape(?,time_step, input_size)
Y = tf.placeholder(tf.float32, [None, time_step, output_size]) # shape(?,time_step, out_size)
weights = {'in': tf.Variable(tf.random_normal([input_size, rnn_unit])),
'out': tf.Variable(tf.random_normal([rnn_unit, 1]))}
biases = {'in': tf.Variable(tf.constant(0.1, shape=[rnn_unit, ])),
'out': tf.Variable(tf.constant(0.1, shape=[1, ]))}
def lstm(batch):
w_in = weights['in']
b_in = biases['in']
input = tf.reshape(X, [-1, input_size])
input_rnn = tf.matmul(input, w_in) + b_in
input_rnn = tf.reshape(input_rnn, [-1, time_step, rnn_unit])
cell = tf.nn.rnn_cell.MultiRNNCell([tf.nn.rnn_cell.BasicLSTMCell(rnn_unit) for i in range(lstm_layers)])
init_state = cell.zero_state(batch, dtype=tf.float32)
output_rnn, final_states = tf.nn.dynamic_rnn(cell, input_rnn, initial_state=init_state, dtype=tf.float32)
output = tf.reshape(output_rnn, [-1, rnn_unit])
w_out = weights['out']
b_out = biases['out']
pred = tf.matmul(output, w_out) + b_out
return pred, final_states
def train_lstm():
global batch_size
with tf.variable_scope("sec_lstm"):
pred, _ = lstm(batch_size)
loss = tf.reduce_mean(tf.square(tf.reshape(pred, [-1]) - tf.reshape(Y, [-1])))
train_op = tf.train.AdamOptimizer(lr).minimize(loss)
saver = tf.train.Saver(tf.global_variables())
loss_list = []
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(100): # We can increase the number of iterations to gain better result.
start = 0
end = start + batch_size
while (end < len(train_x)):
_, loss_ = sess.run([train_op, loss], feed_dict={X: train_x[start:end], Y: train_y[start:end]})
start += batch_size
end = end + batch_size
loss_list.append(loss_)
if i % 10 == 0:
print("Number of iterations:", i, " loss:", loss_list[-1])
if i > 0 and loss_list[-2] > loss_list[-1]:saver.save(sess, 'model_save1\\modle.ckpt')
# I run the code in windows 10,so use 'model_save1\\modle.ckpt'
# if you run it in Linux,please use 'model_save1/modle.ckpt'
print("The train has finished")
train_lstm()
def prediction():
with tf.variable_scope("sec_lstm", reuse=tf.AUTO_REUSE):
pred, _ = lstm(1)
saver = tf.train.Saver(tf.global_variables())
with tf.Session() as sess:
saver.restore(sess, 'model_save1\\modle.ckpt')
# I run the code in windows 10,so use 'model_save1\\modle.ckpt'
# if you run it in Linux,please use 'model_save1/modle.ckpt'
predict = []
for i in range(0, np.shape(train_x)[0]):
next_seq = sess.run(pred, feed_dict={X: [train_x[i]]})
predict.append(next_seq[-1])
plt.figure()
plt.plot(list(range(len(data))), data, color='b')
plt.plot(list(range(time_step + 1, np.shape(train_x)[0] + 1 + time_step)), [value * s + m for value in predict],color='r')
plt.show()
prediction()
参考文章:
更多推荐
所有评论(0)