使用LSTM模型做股票预测【基于Tensorflow】
LSTM模型简介
LSTM是循环神经网络的一种,它具有长短时记忆的能力,克服了传统RNN在输入序列较长时产生的遗忘问题(即梯度消失)。LSTM通过三个分别称为遗忘门、输入门和输出门的结构控制信息的输入输出。LSTM有两个状态h(隐藏状态)和c(细胞状态),h控制短期记忆,c控制长期记忆。
其结构示意图为:
其各个门的数学表达为:
其中小圆圈表示哈达玛乘积。
最后,再总结一下各个门的客观意义:
遗忘门:控制上个细胞状态有多少信息被保留
输入门:控制当前的输入信息有多少被保留
输出门:控制当前输出有多少信息值得保留
数据集
股票数据总共有九个维度,分别是
由于本人对经济学没有太多研究,所以这些各个维度所代表的信息我也不是很清楚,但在我眼里,它们就是一堆时序数据,而长短时记忆时序模型LSTM处理时序数据具有很强的优势。
项目数据(存储格式是excel):https://pan.baidu.com/s/1qcqCDAATaHapMOs2I_qr6A
提取码:1346
然后我们来简单观察一下数据集的分布。使用pandas库读取excel文件后,将其转换为numpy数组,简单剔除掉代号、日期之类的无用数据后,利用绘图库matplotlib将各个维度的数据分布绘制出来,程序如下:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
#check train data and test data
train_data = pd.read_excel('train_data.xlsx',index_col=0)
train_arrs = np.array(train_data.iloc[:,:])
trains = train_arrs[:,-9:].astype('float32')
test_data = pd.read_excel('test_data.xlsx',index_col=0)
test_arrs = np.array(test_data.iloc[:,:])
tests = test_arrs[:,-9:].astype('float32')
dim_names = ['open','high','low','close','pre_close','change','pct_chg','vol','amount']
#normalize
for dim in range(trains.shape[1]):
trains[:,dim] = (trains[:,dim] - trains[:,dim].min()) / (trains[:,dim].max() - trains[:,dim].min())
for dim in range(tests.shape[1]):
tests[:,dim] = (tests[:,dim] - tests[:,dim].min()) / (tests[:,dim].max() - tests[:,dim].min())
#visualization of train data
for dim in range(trains.shape[1]):
plt.subplot(3,3,dim+1)
plt.plot(trains[:,dim])
plt.title('%s'%(dim_names[dim]))
plt.show()
#visualization of test data
for dim in range(tests.shape[1]):
plt.subplot(3,3,dim+1)
plt.plot(tests[:,dim])
plt.title('%s'%(dim_names[dim]))
plt.show()
效果如下:
对于训练集:
对于测试集:
可以简单看出,各个维度并不是相互独立的,有些分布具有很强的相似性。
接着,我们来创建train_batches和test_batches以训练模型。这里用到的技巧其实不多,有一点提一下,对于数据集使用归一化能加快模型的收敛以及抑制梯度爆炸。然后要注意这是预测模型,不是回归模型,模型的输入是前n个time step的由各个维度构成的向量组,输出的是下个time step的包含各个维度的向量,说起来有点绕,我作了一个简单的示意图,希望能理清你们的思路:
从shape的角度理解的话,那么输入数据的shape为[BATCH_SIZE,TIME_STEP,INPUT_DIM]。输出数据的shape为[BATCH_SIZE,INPUT_DIM]
说明一下,该项目的预测只是对于开盘价,也就是’open’这个维度,所以实际的输出数据的shape为[BATCH_SIZE,1]
创建train/test batches的代码如下:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
#读取训练数据
train_data = pd.read_excel('train_data.xlsx',index_col=0)
train_arrs = np.array(train_data.iloc[:,:])
train_xs = train_arrs[:,-8:].astype('float32')
train_ys = (np.array(train_data['open'],dtype='float32')).reshape(-1,1)
#读取测试数据
test_data = pd.read_excel('test_data.xlsx',index_col=0)
test_arrs = np.array(test_data.iloc[:,:])
test_xs = test_arrs[:,-8:].astype('float32')
test_ys = (np.array(test_data['open'],dtype='float32')).reshape(-1,1)
#归一化
train_ys = (train_ys-train_ys.min()) / (train_ys.max() - train_ys.min())
test_ys = (test_ys-test_ys.min()) / (test_ys.max() - test_ys.min())
for dim in range(train_xs.shape[1]):
train_xs[:,dim] = (train_xs[:,dim] - train_xs[:,dim].min()) / (train_xs[:,dim].max() - train_xs[:,dim].min())
for dim in range(test_xs.shape[1]):
test_xs[:,dim] = (test_xs[:,dim] - test_xs[:,dim].min()) / (test_xs[:,dim].max() - test_xs[:,dim].min())
#由于是预测任务,那么数据的第一个维度会少掉一个time_step-1
time_step = 8
input_dim = 8
aranged_train_xs = np.zeros(shape=(train_xs.shape[0]-time_step+1,time_step,input_dim))
for idx in range(aranged_train_xs.shape[0]):
aranged_train_xs[idx] = train_xs[idx:idx+8]
aranged_test_xs = np.zeros(shape=(test_xs.shape[0]-time_step+1,time_step,input_dim))
for idx in range(aranged_test_xs.shape[0]):
aranged_test_xs[idx] = test_xs[idx:idx+8]
aranged_train_ys = train_ys[time_step-1:]
aranged_test_ys = test_ys[time_step-1:]
#保存数据
np.save(r'train_x_batch.npy',aranged_train_xs)
np.save(r'train_y_batch.npy',aranged_train_ys)
np.save(r'test_x_batch.npy',aranged_test_xs)
np.save(r'test_y_batch.npy',aranged_test_ys)
搭建/训练模型
在整理得到数据集后,我们就可以开始建模了。
建模流程(建议大家都多花时间提升下建模能力):
①定义超参数(batch_size、学习率、epochs、神经元数量等)
②定义待训练参数(从什么分布取样,需要做什么正则化)
③ 定义一个load_data函数,从之前创建的数据集中读取数据
④定义LSTM单元,注意默认的激活函数是tanh,同时可以利用 tf.contrib.rnn.DropoutWrapper增加dropout层。
⑤定义LSTM网络,LSTM接受的是时序数据,所以需要将输入变成一个列表,列表的长度及时间步数。然后使用列表推导技巧(官方的办法)定义多层LSTM网络,建议都使用这个技巧,据了解别的方式BUG很多。最后使用tf.contrib.rnn.static_rnn得到网络输出,注意static和dynamic的区别,前者的时间步数是固定的,而后者是可变的,对于我们创造的数据集,每个Batch的时间步数都相同,所以我们使用static(静态)方式。
⑥训练模型,对于Tensorflow框架,其训练模型的方式大家都应该烂熟于心了。定义placeholder,定义预测函数、损失函数、优化函数等等。然后启动Session(交互式或非交互式),最后迭代进行训练。
整个过程的代码如下:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
plt.rcParams['font.sans-serif']=['SimHei']#显示中文
plt.rcParams['axes.unicode_minus']=False#显示负号
#Hyperparams
batch_size = 128
lr = 1e-6
epochs = 600
num_neurons = [32,32,64,64,128,128]
kp = 1.0
#定义输出层的weight和bias
w = tf.Variable(tf.random_normal([num_neurons[-1],1]))
b = tf.Variable(tf.random_normal([1]))
def load_data():
train_x_batch = np.load(r'train_x_batch.npy',allow_pickle=True)
train_y_batch = np.load(r'train_y_batch.npy',allow_pickle=True)
return (train_x_batch,train_y_batch)
#定义lstm单元
def lstm_cell(units,keep_prob):
cell = tf.contrib.rnn.BasicLSTMCell(num_units=units,forget_bias=0.9)#activation默认为tanh
return tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob=keep_prob)
#定义lstm网络
def lstm_net(x,w,b,num_neurons,keep_prob):
#将输入变成一个列表,列表的长度及时间步数
inputs = tf.unstack(x,8,1)
cells = [lstm_cell(units=n,keep_prob = keep_prob) for n in num_neurons]
stacked_lstm_cells = tf.contrib.rnn.MultiRNNCell(cells)
outputs,_ = tf.contrib.rnn.static_rnn(stacked_lstm_cells,inputs,dtype=tf.float32)
return tf.matmul(outputs[-1],w) + b
if __name__ == '__main__':
#载入数据
(train_x,train_y) = load_data()
#定义placeholder
x = tf.placeholder(shape=(None,8,8),dtype=tf.float32)
y = tf.placeholder(shape=(None,1),dtype=tf.float32)
keep_prob = tf.placeholder(tf.float32,[])
#定义预测函数、损失函数、优化函数、初始函数、保存函数
pred = lstm_net(x,w,b,num_neurons,keep_prob)
cost = tf.reduce_mean(tf.reshape(tf.pow((pred-y),2),[-1]))
optimizer = tf.train.GradientDescentOptimizer(learning_rate=lr).minimize(cost)
init = tf.global_variables_initializer()
saver = tf.train.Saver(tf.global_variables())
#启动交互式Session
sess = tf.InteractiveSession()
#训练模型
sess.run(init)
losses = []#记录每个epoch结束时的损失值
for epoch in range(epochs):
for step in range(train_x.shape[0]//batch_size+1):
batch_x = train_x[step*batch_size:(step+1)*batch_size]
batch_y = train_y[step*batch_size:(step+1)*batch_size]
sess.run(optimizer,feed_dict={x:batch_x,y:batch_y,keep_prob:kp})
loss = sess.run(cost,feed_dict={x:batch_x,y:batch_y,keep_prob:1.0})
losses.append(loss)
print('Epoch[{}/{}],Loss = {:.4f}\n'.format(epoch+1,epochs,loss))
#可视化训练过程
plt.plot(losses)
plt.ylim(0,1.2*max(losses))
plt.title('损失值随迭代周期的改变')
plt.xlabel('Epoch')
plt.ylabel('损失值')
plt.show()
#保存模型
#saver.save(sess,r'model_data\my_model.ckpt')
#关闭会话
sess.close()
训练过程的损失曲线为:
使用模型进行预测
重头戏来了,所谓是骡子还是马,拉出来遛一遛就知道了,你的建模能力再强,数据处理能力再好,如果模型最终效果不行,都白搭。
要做预测,就要写个predictor脚本,方式其实更建模差不多,区别就是不用训练参数,而是加载已经训练好的参数,代码如下:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
plt.rcParams['font.sans-serif']=['SimHei']#显示中文
plt.rcParams['axes.unicode_minus']=False#显示负号
def load_data():
test_x_batch = np.load(r'test_x_batch.npy',allow_pickle=True)
test_y_batch = np.load(r'test_y_batch.npy',allow_pickle=True)
return (test_x_batch,test_y_batch)
#定义lstm单元
def lstm_cell(units):
cell = tf.contrib.rnn.BasicLSTMCell(num_units=units,forget_bias=0.0)#activation默认为tanh
return cell
#定义lstm网络
def lstm_net(x,w,b,num_neurons):
#将输入变成一个列表,列表的长度及时间步数
inputs = tf.unstack(x,8,1)
cells = [lstm_cell(units=n) for n in num_neurons]
stacked_lstm_cells = tf.contrib.rnn.MultiRNNCell(cells)
outputs,_ = tf.contrib.rnn.static_rnn(stacked_lstm_cells,inputs,dtype=tf.float32)
return tf.matmul(outputs[-1],w) + b
#超参数
num_neurons = [32,32,64,64,128,128]
#定义输出层的weight和bias
w = tf.Variable(tf.random_normal([num_neurons[-1],1]))
b = tf.Variable(tf.random_normal([1]))
#定义placeholder
x = tf.placeholder(shape=(None,8,8),dtype=tf.float32)
#定义pred和saver
pred = lstm_net(x,w,b,num_neurons)
saver = tf.train.Saver(tf.global_variables())
if __name__ == '__main__':
#开启交互式Session
sess = tf.InteractiveSession()
saver.restore(sess,r'D:\股票预测\model_data\my_model.ckpt')
#载入数据
test_x,test_y = load_data()
#预测
predicts = sess.run(pred,feed_dict={x:test_x})
predicts = ((predicts.max() - predicts) / (predicts.max() - predicts.min()))#数学校准
#可视化
plt.plot(predicts,'r',label='预测曲线')
plt.plot(test_y,'g',label='真实曲线')
plt.xlabel('第几天/days')
plt.ylabel('开盘价(归一化)')
plt.title('股票开盘价曲线预测(测试集)')
plt.legend()
plt.show()
#关闭会话
sess.close()
最终效果如图所示:
后话
在训练过程中,本人遇到过两个问题,我将它们命名为颠倒性和超前/滞后性。前者是指,最终的预测曲线的趋势与真实曲线完完全全相反,比如真实曲线上升达到极大值,预测曲线就下降达到极小值,然而,在将预测曲线上下颠倒后,其与真实曲线又能很好吻合!在经过仔细地研究和排查后,发现这种现象貌似不是来源于程序,而是具有随机性,或者用哲学的话来说,感觉这是时序模型的一个种属(就像欲望之于人类)。后者是所有时序模型的通病,就是说预测模型会永远多多少少比真实模型超前或者滞后。怎么说呢,这两点的存在,使得预测和现实总是有个无法逾越的鸿沟。希望未来能看到能够完美预测的时序模型。
更多推荐
所有评论(0)