Created
February 3, 2022 08:40
-
-
Save jonlachmann/5cd68c9667a99e4f89edc0c307f94ddb to your computer and use it in GitHub Desktop.
Many to many LSTM in both keras and pytorch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from numpy import array | |
from numpy import linspace | |
from numpy import random | |
from numpy import zeros | |
from numpy import vstack | |
import torch | |
# Split a multivariate sequence into samples | |
def split_sequences(sequences, n_steps): | |
X, y = list(), list() | |
for i in range(len(sequences)): | |
# find the end of this pattern | |
end_ix = i + n_steps | |
# check if we are beyond the dataset | |
if end_ix > len(sequences)-1: | |
break | |
# gather input and output parts of the pattern | |
seq_x, seq_y = sequences[i:end_ix, :], sequences[end_ix, :] | |
X.append(seq_x) | |
y.append(seq_y) | |
return array(X), array(y) | |
# Create a set of data where the columns contain increasing series plus gaussian noise | |
def get_data(): | |
data = linspace(1, 300, 300) + random.normal(0, 1, 300) | |
data = data.reshape(100, 3, order='F') | |
return data | |
# Create a prediction using a keras model | |
def predict_keras(data, h, n_features, n_steps, model): | |
x_input = data[88:100, :] | |
pred = zeros((h, n_features)) | |
for i in range(h): | |
x_tensor = x_input.reshape((1, n_steps, n_features)) | |
pred[i, :] = model.predict(x_tensor, verbose=0) | |
x_input = vstack((x_input[1:12, :], pred[i, :])) | |
return pred | |
# Create a prediction using a pytorch model | |
def predict_torch(data, h, n_features, n_steps, model): | |
x_input = data[88:100, :] | |
pred = zeros((h, n_features)) | |
for i in range(h): | |
x_tensor = torch.tensor(x_input.reshape((1, n_steps, n_features)), dtype=torch.float32) | |
model.init_hidden(1) | |
pred[i, :] = model(x_tensor).detach().numpy() | |
x_input = vstack((x_input[1:12, :], pred[i, :])) | |
return pred |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import common | |
from keras.models import Sequential | |
from keras.layers import LSTM | |
from keras.layers import Dense | |
# Define the number of timesteps and features | |
n_features = 3 | |
n_steps = 12 | |
# Generate data and create supervised learning examples | |
data = common.get_data() | |
X, y = common.split_sequences(data, n_steps) | |
# Define the keras model | |
model = Sequential() | |
model.add(LSTM(100, activation='relu', return_sequences=True, input_shape=(n_steps, n_features))) | |
model.add(LSTM(100, activation='relu')) | |
model.add(Dense(n_features)) | |
model.compile(optimizer='adam', loss='mse') | |
# Train the model on the data | |
model.fit(X, y, epochs=400, verbose=1) | |
# Predict h=12 steps ahead recursively | |
pred = common.predict_keras(data, 12, 3, 12, model) | |
print(pred) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import common | |
import torch | |
# Define the pytorch model | |
class torchLSTM(torch.nn.Module): | |
def __init__(self, n_features, seq_length): | |
super(torchLSTM, self).__init__() | |
self.n_features = n_features | |
self.seq_len = seq_length | |
self.n_hidden = 100 # number of hidden states | |
self.n_layers = 1 # number of LSTM layers (stacked) | |
self.l_lstm = torch.nn.LSTM(input_size=n_features, | |
hidden_size=self.n_hidden, | |
num_layers=self.n_layers, | |
batch_first=True) | |
# according to pytorch docs LSTM output is | |
# (batch_size,seq_len, num_directions * hidden_size) | |
# when considering batch_first = True | |
self.l_linear = torch.nn.Linear(self.n_hidden * self.seq_len, 3) | |
def init_hidden(self, batch_size): | |
# even with batch_first = True this remains same as docs | |
hidden_state = torch.zeros(self.n_layers, batch_size, self.n_hidden) | |
cell_state = torch.zeros(self.n_layers, batch_size, self.n_hidden) | |
self.hidden = (hidden_state, cell_state) | |
def forward(self, x): | |
batch_size, seq_len, _ = x.size() | |
lstm_out, self.hidden = self.l_lstm(x, self.hidden) | |
# lstm_out(with batch_first = True) is | |
# (batch_size,seq_len,num_directions * hidden_size) | |
# for following linear layer we want to keep batch_size dimension and merge rest | |
# .contiguous() -> solves tensor compatibility error | |
x = lstm_out.contiguous().view(batch_size, -1) | |
return self.l_linear(x) | |
# Define the number of timesteps and features | |
n_features = 3 | |
n_steps = 12 | |
# Generate data and create supervised learning examples | |
data = common.get_data() | |
X, y = common.split_sequences(data, n_steps) | |
# Instantiate the model | |
mv_net = torchLSTM(n_features, n_steps) | |
criterion = torch.nn.MSELoss() | |
optimizer = torch.optim.Adam(mv_net.parameters(), lr=1e-1) | |
epochs = 400 | |
batch_size = 16 | |
# Train the model | |
mv_net.train() | |
for t in range(epochs): | |
for b in range(0, len(X), batch_size): | |
inpt = X[b:b + batch_size, :, :] | |
target = y[b:b + batch_size] | |
x_batch = torch.tensor(inpt, dtype=torch.float32) | |
y_batch = torch.tensor(target, dtype=torch.float32) | |
mv_net.init_hidden(x_batch.size(0)) | |
output = mv_net(x_batch) | |
loss = criterion(output, y_batch) | |
loss.backward() | |
optimizer.step() | |
optimizer.zero_grad() | |
print('step : ', t, 'loss : ', loss.item()) | |
# Predict h=12 steps ahead recursively | |
pred = common.predict_torch(data, 12, 3, 12, mv_net) | |
print(pred) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment