import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
# 数据预处理函数
def preprocess_data(data, n_in, n_out=1, drop_end=True, scale=True):
df = data.copy()
if drop_end:
df = df.drop(df.index[-n_out:], axis=0)
if scale:
scaler = MinMaxScaler(feature_range=(-1, 1))
df = scaler.fit_transform(df.values)
df = df.astype('float32')
print(df.shape)
X, y = [], []
for i in range(len(df) - n_in - n_out + 1):
X.append(df[i:i + n_in, :].reshape((1, n_in, df.shape[1])))
y.append(df[i + n_in:i + n_in + n_out, :].reshape((1, n_out, df.shape[1])))
return np.array(X), np.array(y)
# 模型定义和训练
def train_model(train_X, train_y, n_epochs, batch_size, n_neurons=50, n_layers=2):
model = Sequential()
for i in range(n_layers):
if i == 0:
model.add(LSTM(n_neurons, input_shape=(train_X.shape[1], train_X.shape[2]), return_sequences=True))
else:
model.add(LSTM(n_neurons, return_sequences=True))
model.add(Dense(train_y.shape[1], activation='linear'))
model.compile(loss='mse', optimizer='adam')
model.fit(train_X, train_y, epochs=n_epochs, batch_size=batch_size, verbose=2, shuffle=False)
return model
# 模型评估
def evaluate_model(model, test_X, test_y):
preds = model.predict(test_X)
preds = preds.reshape((preds.size, 1))
print(mean_squared_error(test_y, preds))
# 示例使用
n_in = 10 # 输入序列的长度
n_out = 1 # 输出序列的长度
n_epochs = 100 # 训练的轮数
batch_size = 1 # 批处理大小
n_neurons = 50 # LSTM隐藏层的神经元数
n_layers = 2 # LSTM层的数量
# 假设data是一个Pandas DataFrame,包含所有的数据
# 预处理训练集和测试集
train_X, train_y = preprocess_data(data[:-60], n_in, n_out)
test_X, test_y = preprocess_data(data[-60:], n_in, n_out)
# 训练模型
model = train_model(train_X, train_y, n_epochs, batch_size, n_neurons, n_layers)
# 评估