Last active
March 17, 2021 17:11
-
-
Save oscar-defelice/6f3aff440730cf92e4e6c9cd8eae181b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow as tf | |
from tensorflow.keras.models import load_model Sequential | |
from tensorflow.keras.layers import Dense | |
import json | |
import numpy as np | |
from sklearn.model_selection import train_test_split | |
def read_config(config_file_path): | |
with open(config_file_path, 'r') as f: | |
config = json.load(f) | |
return config | |
def feature_selection(data): | |
""" | |
feature_selection function. | |
It takes data array and returns the feature selected. | |
Arguments: | |
data np.array of shape (n_training_example, n_features) | |
Returns: | |
data_reduced np.array of shape (n_training_example, n_reduced_features) | |
""" | |
data_reduced = data # In this case we keep all the features. Change this according to your analysis. | |
assert data_reduced.shape[0] == data.shape[0], "Data leaking!" | |
return data_reduced | |
def feature_normalisation(data): | |
""" | |
feature_normalisation function. | |
It takes data array and returns it with feature normalised. | |
Arguments: | |
data np.array of shape (n_training_example, n_features) | |
Returns: | |
data_normalised np.array of shape (n_training_example, n_features) | |
""" | |
data_normalised = data | |
mean = data_normalised.mean(axis=0) | |
data_normalised -= mean | |
std = data_normalised.std(axis=0) | |
data_normalised /= std | |
assert data_normalised.shape == data.shape, "Data leaking!" | |
return data_normalised | |
def import_data(config): | |
""" | |
import_data function. | |
It makes use of sklearn.model_selection.train_test_split. | |
Arguments: | |
config dict containing the following variables | |
data dict of np.arrays | |
data['data'] is the array made of feature vectors rows. | |
data['target'] is the array of target values. | |
train_test_ratio float | |
the ratio between train and test set sizes. | |
default: 0.2 | |
Returns: | |
tuple of four np.arrays (X_train, X_test, Y_train, Y_test) of shape | |
- X_train (n_training_examples, n_features) | |
- X_test (n_test_examples, n_features) | |
- Y_train (n_training_examples, ) | |
- Y_test (n_test_examples, ) | |
""" | |
data_path, train_test_ratio, normalise = config['data'], config['train_test_ratio'], config['normalise'] | |
data = read_config(data_path) | |
X = feature_selection(np.array(data['data'])) | |
if normalise: | |
X = feature_normalisation(X) | |
Y = np.array(data['target']) | |
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size = train_test_ratio, random_state=42) | |
assert X_train.shape[1] == X_test.shape[1], "Train and test shapes do not correspond!" | |
return X_train, X_test, Y_train, Y_test | |
def get_model(config, num_features): | |
pretrained = config['pretrained_model'] | |
if pretrained is not Null: | |
model = load_model(pretrained) | |
else: | |
params = config['model_config'] | |
model_name, layers, activations = params['model_name'], params['layers'], params['activations'] | |
loss, opt, metrics = params['loss_function'], params['optimiser'], params['metrics'] | |
model = Sequential(name = model_name) | |
for l, (name, n_units) in enumerate(layers.items()): | |
if l==0: | |
model.add(Dense(units=n_units, input_dim=num_features, activation = activations[l], name = name)) | |
else: | |
model.add(Dense(units=n_units, activation = activations[l], name = name)) | |
model.compile(loss=loss, optimizer=opt, metrics=metrics) | |
model.summary() | |
return model | |
def train(config): | |
""" | |
Training function over data. | |
It defines a model and train it over data. | |
Returns: | |
model.history | |
model | |
""" | |
batch_size, epochs, save = config['training_config']['batch_size'], config['training_config']['epochs'], config['save_model'] | |
model_path = config['model_path'] | |
X_train, X_test, Y_train, Y_test = import_data(config) | |
num_features = X_train.shape[1] | |
model = get_model(config, num_features) | |
hist = model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs, validation_data=(X_test, Y_test)) | |
if save: | |
model.save(model_path) | |
return hist |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment