LSTM模型简介

LSTM是循环神经网络的一种,它具有长短时记忆的能力,克服了传统RNN在输入序列较长时产生的遗忘问题(即梯度消失)。LSTM通过三个分别称为遗忘门、输入门和输出门的结构控制信息的输入输出。LSTM有两个状态h(隐藏状态)和c(细胞状态),h控制短期记忆,c控制长期记忆。
其结构示意图为:
在这里插入图片描述
其各个门的数学表达为:
在这里插入图片描述
其中小圆圈表示哈达玛乘积。
最后,再总结一下各个门的客观意义:
遗忘门:控制上个细胞状态有多少信息被保留
输入门:控制当前的输入信息有多少被保留
输出门:控制当前输出有多少信息值得保留

数据集

股票数据总共有九个维度,分别是在这里插入图片描述
由于本人对经济学没有太多研究,所以这些各个维度所代表的信息我也不是很清楚,但在我眼里,它们就是一堆时序数据,而长短时记忆时序模型LSTM处理时序数据具有很强的优势。
项目数据(存储格式是excel):https://pan.baidu.com/s/1qcqCDAATaHapMOs2I_qr6A
提取码:1346

然后我们来简单观察一下数据集的分布。使用pandas库读取excel文件后,将其转换为numpy数组,简单剔除掉代号、日期之类的无用数据后,利用绘图库matplotlib将各个维度的数据分布绘制出来,程序如下:

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

#check train data and test data
train_data = pd.read_excel('train_data.xlsx',index_col=0)
train_arrs = np.array(train_data.iloc[:,:])
trains = train_arrs[:,-9:].astype('float32')

test_data = pd.read_excel('test_data.xlsx',index_col=0)
test_arrs = np.array(test_data.iloc[:,:])
tests = test_arrs[:,-9:].astype('float32')

dim_names = ['open','high','low','close','pre_close','change','pct_chg','vol','amount']

#normalize
for dim in range(trains.shape[1]):
    trains[:,dim] = (trains[:,dim] - trains[:,dim].min()) / (trains[:,dim].max() - trains[:,dim].min())

for dim in range(tests.shape[1]):
    tests[:,dim]  = (tests[:,dim] - tests[:,dim].min()) / (tests[:,dim].max() - tests[:,dim].min())

#visualization of train data
for dim in range(trains.shape[1]):
    plt.subplot(3,3,dim+1)
    plt.plot(trains[:,dim])
    plt.title('%s'%(dim_names[dim]))
plt.show()

#visualization of test data
for dim in range(tests.shape[1]):
    plt.subplot(3,3,dim+1)
    plt.plot(tests[:,dim])
    plt.title('%s'%(dim_names[dim]))
plt.show()

效果如下:
对于训练集:
在这里插入图片描述
对于测试集:
在这里插入图片描述
可以简单看出,各个维度并不是相互独立的,有些分布具有很强的相似性。

接着,我们来创建train_batches和test_batches以训练模型。这里用到的技巧其实不多,有一点提一下,对于数据集使用归一化能加快模型的收敛以及抑制梯度爆炸。然后要注意这是预测模型,不是回归模型,模型的输入是前n个time step的由各个维度构成的向量组,输出的是下个time step的包含各个维度的向量,说起来有点绕,我作了一个简单的示意图,希望能理清你们的思路:
在这里插入图片描述
从shape的角度理解的话,那么输入数据的shape为[BATCH_SIZE,TIME_STEP,INPUT_DIM]。输出数据的shape为[BATCH_SIZE,INPUT_DIM]
说明一下,该项目的预测只是对于开盘价,也就是’open’这个维度,所以实际的输出数据的shape为[BATCH_SIZE,1]

创建train/test batches的代码如下:

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

#读取训练数据
train_data = pd.read_excel('train_data.xlsx',index_col=0)
train_arrs = np.array(train_data.iloc[:,:])
train_xs = train_arrs[:,-8:].astype('float32')
train_ys = (np.array(train_data['open'],dtype='float32')).reshape(-1,1)
#读取测试数据
test_data = pd.read_excel('test_data.xlsx',index_col=0)
test_arrs = np.array(test_data.iloc[:,:])
test_xs = test_arrs[:,-8:].astype('float32')
test_ys = (np.array(test_data['open'],dtype='float32')).reshape(-1,1)

#归一化
train_ys = (train_ys-train_ys.min()) / (train_ys.max() - train_ys.min())
test_ys  = (test_ys-test_ys.min())   / (test_ys.max()  - test_ys.min())
for dim in range(train_xs.shape[1]):
    train_xs[:,dim] = (train_xs[:,dim] - train_xs[:,dim].min()) / (train_xs[:,dim].max() - train_xs[:,dim].min())

for dim in range(test_xs.shape[1]):
    test_xs[:,dim]  = (test_xs[:,dim] - test_xs[:,dim].min()) / (test_xs[:,dim].max() - test_xs[:,dim].min())

#由于是预测任务,那么数据的第一个维度会少掉一个time_step-1
time_step = 8
input_dim = 8

aranged_train_xs = np.zeros(shape=(train_xs.shape[0]-time_step+1,time_step,input_dim))
for idx in range(aranged_train_xs.shape[0]):
    aranged_train_xs[idx] = train_xs[idx:idx+8]

aranged_test_xs = np.zeros(shape=(test_xs.shape[0]-time_step+1,time_step,input_dim))
for idx in range(aranged_test_xs.shape[0]):
    aranged_test_xs[idx] = test_xs[idx:idx+8]

aranged_train_ys = train_ys[time_step-1:]
aranged_test_ys  =  test_ys[time_step-1:]

#保存数据
np.save(r'train_x_batch.npy',aranged_train_xs)
np.save(r'train_y_batch.npy',aranged_train_ys)
np.save(r'test_x_batch.npy',aranged_test_xs)
np.save(r'test_y_batch.npy',aranged_test_ys)

搭建/训练模型

在整理得到数据集后,我们就可以开始建模了。
建模流程(建议大家都多花时间提升下建模能力):
①定义超参数(batch_size、学习率、epochs、神经元数量等)
②定义待训练参数(从什么分布取样,需要做什么正则化)
③ 定义一个load_data函数,从之前创建的数据集中读取数据
④定义LSTM单元,注意默认的激活函数是tanh,同时可以利用 tf.contrib.rnn.DropoutWrapper增加dropout层。
⑤定义LSTM网络,LSTM接受的是时序数据,所以需要将输入变成一个列表,列表的长度及时间步数。然后使用列表推导技巧(官方的办法)定义多层LSTM网络,建议都使用这个技巧,据了解别的方式BUG很多。最后使用tf.contrib.rnn.static_rnn得到网络输出,注意static和dynamic的区别,前者的时间步数是固定的,而后者是可变的,对于我们创造的数据集,每个Batch的时间步数都相同,所以我们使用static(静态)方式。
⑥训练模型,对于Tensorflow框架,其训练模型的方式大家都应该烂熟于心了。定义placeholder,定义预测函数、损失函数、优化函数等等。然后启动Session(交互式或非交互式),最后迭代进行训练。

整个过程的代码如下:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
plt.rcParams['font.sans-serif']=['SimHei']#显示中文
plt.rcParams['axes.unicode_minus']=False#显示负号

#Hyperparams
batch_size = 128
lr = 1e-6
epochs = 600
num_neurons = [32,32,64,64,128,128]
kp = 1.0

#定义输出层的weight和bias
w = tf.Variable(tf.random_normal([num_neurons[-1],1]))
b = tf.Variable(tf.random_normal([1]))

def load_data():
    train_x_batch = np.load(r'train_x_batch.npy',allow_pickle=True)
    train_y_batch = np.load(r'train_y_batch.npy',allow_pickle=True)
    return (train_x_batch,train_y_batch)

#定义lstm单元
def lstm_cell(units,keep_prob):
    cell = tf.contrib.rnn.BasicLSTMCell(num_units=units,forget_bias=0.9)#activation默认为tanh
    return tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob=keep_prob)

#定义lstm网络
def lstm_net(x,w,b,num_neurons,keep_prob):
    #将输入变成一个列表,列表的长度及时间步数
    inputs = tf.unstack(x,8,1)
    cells = [lstm_cell(units=n,keep_prob = keep_prob) for n in num_neurons]
    stacked_lstm_cells = tf.contrib.rnn.MultiRNNCell(cells)
    outputs,_ =  tf.contrib.rnn.static_rnn(stacked_lstm_cells,inputs,dtype=tf.float32)
    return tf.matmul(outputs[-1],w) + b


if __name__ == '__main__':

    #载入数据
    (train_x,train_y) = load_data()

    #定义placeholder
    x = tf.placeholder(shape=(None,8,8),dtype=tf.float32)
    y = tf.placeholder(shape=(None,1),dtype=tf.float32)
    keep_prob = tf.placeholder(tf.float32,[])

    #定义预测函数、损失函数、优化函数、初始函数、保存函数
    pred = lstm_net(x,w,b,num_neurons,keep_prob)
    cost = tf.reduce_mean(tf.reshape(tf.pow((pred-y),2),[-1]))
    optimizer = tf.train.GradientDescentOptimizer(learning_rate=lr).minimize(cost)
    init  = tf.global_variables_initializer()
    saver = tf.train.Saver(tf.global_variables())

    #启动交互式Session
    sess = tf.InteractiveSession()

    #训练模型
    sess.run(init)
    losses = []#记录每个epoch结束时的损失值
    for epoch in range(epochs):
        for step in range(train_x.shape[0]//batch_size+1):
            batch_x = train_x[step*batch_size:(step+1)*batch_size]
            batch_y = train_y[step*batch_size:(step+1)*batch_size]
            sess.run(optimizer,feed_dict={x:batch_x,y:batch_y,keep_prob:kp})
            
        loss = sess.run(cost,feed_dict={x:batch_x,y:batch_y,keep_prob:1.0})
        losses.append(loss)
        print('Epoch[{}/{}],Loss = {:.4f}\n'.format(epoch+1,epochs,loss))

    #可视化训练过程
    plt.plot(losses)
    plt.ylim(0,1.2*max(losses))
    plt.title('损失值随迭代周期的改变')
    plt.xlabel('Epoch')
    plt.ylabel('损失值')
    plt.show()

    #保存模型
    #saver.save(sess,r'model_data\my_model.ckpt')

    #关闭会话
    sess.close()

训练过程的损失曲线为:
在这里插入图片描述

使用模型进行预测

重头戏来了,所谓是骡子还是马,拉出来遛一遛就知道了,你的建模能力再强,数据处理能力再好,如果模型最终效果不行,都白搭。
要做预测,就要写个predictor脚本,方式其实更建模差不多,区别就是不用训练参数,而是加载已经训练好的参数,代码如下:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
plt.rcParams['font.sans-serif']=['SimHei']#显示中文
plt.rcParams['axes.unicode_minus']=False#显示负号

def load_data():
    test_x_batch = np.load(r'test_x_batch.npy',allow_pickle=True)
    test_y_batch = np.load(r'test_y_batch.npy',allow_pickle=True)
    return (test_x_batch,test_y_batch)

#定义lstm单元
def lstm_cell(units):
    cell = tf.contrib.rnn.BasicLSTMCell(num_units=units,forget_bias=0.0)#activation默认为tanh
    return cell

#定义lstm网络
def lstm_net(x,w,b,num_neurons):
    #将输入变成一个列表,列表的长度及时间步数
    inputs = tf.unstack(x,8,1)
    cells = [lstm_cell(units=n) for n in num_neurons]
    stacked_lstm_cells = tf.contrib.rnn.MultiRNNCell(cells)
    outputs,_ =  tf.contrib.rnn.static_rnn(stacked_lstm_cells,inputs,dtype=tf.float32)
    return tf.matmul(outputs[-1],w) + b

#超参数
num_neurons = [32,32,64,64,128,128]

#定义输出层的weight和bias
w = tf.Variable(tf.random_normal([num_neurons[-1],1]))
b = tf.Variable(tf.random_normal([1]))

#定义placeholder
x = tf.placeholder(shape=(None,8,8),dtype=tf.float32)

#定义pred和saver
pred = lstm_net(x,w,b,num_neurons)
saver = tf.train.Saver(tf.global_variables())

if __name__ == '__main__':

    #开启交互式Session
    sess = tf.InteractiveSession()
    saver.restore(sess,r'D:\股票预测\model_data\my_model.ckpt')

    #载入数据
    test_x,test_y = load_data()

    #预测
    predicts = sess.run(pred,feed_dict={x:test_x})
    predicts = ((predicts.max() - predicts) / (predicts.max() - predicts.min()))#数学校准

    #可视化
    plt.plot(predicts,'r',label='预测曲线')
    plt.plot(test_y,'g',label='真实曲线')
    plt.xlabel('第几天/days')
    plt.ylabel('开盘价(归一化)')
    plt.title('股票开盘价曲线预测(测试集)')
    plt.legend()
	plt.show()
    #关闭会话
    sess.close()	

最终效果如图所示:
在这里插入图片描述

后话

在训练过程中,本人遇到过两个问题,我将它们命名为颠倒性和超前/滞后性。前者是指,最终的预测曲线的趋势与真实曲线完完全全相反,比如真实曲线上升达到极大值,预测曲线就下降达到极小值,然而,在将预测曲线上下颠倒后,其与真实曲线又能很好吻合!在经过仔细地研究和排查后,发现这种现象貌似不是来源于程序,而是具有随机性,或者用哲学的话来说,感觉这是时序模型的一个种属(就像欲望之于人类)。后者是所有时序模型的通病,就是说预测模型会永远多多少少比真实模型超前或者滞后。怎么说呢,这两点的存在,使得预测和现实总是有个无法逾越的鸿沟。希望未来能看到能够完美预测的时序模型。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐