Last active
October 7, 2019 12:46
-
-
Save baogorek/a8e1824e575349dbf255c8db28847e3e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
import patsy | |
import tensorflow as tf | |
class SleepReg(tf.Module): | |
def __init__(self, sleepdata_path): | |
"""Initializing tensorflow variables and other necessary matrices""" | |
# These two TensorFlow variables show up in the trainable_variables | |
self.beta = tf.Variable(tf.zeros([2, 1], dtype=tf.float64)) | |
self.b = tf.Variable(tf.zeros([36, 1], dtype=tf.float64)) | |
self.set_optimizer() | |
self.data = self.get_sleepstudy_data(sleepdata_path) | |
self.N_subjects = 18 | |
self.y = self.data.Reaction.values.reshape((180, 1)) | |
self.X = self.get_X_matrix() | |
self.Z = self.get_Z_matrix() | |
# Initializing Sigma_b and sigmasq_epsilon to lme4's output for demo | |
self.Sigma_b = np.array([[24.7404 ** 2, .066 * 5.9221 * 24.7404], | |
[.066 * 24.7404 * 5.9221, 5.9221 ** 2]]) | |
self.sigmasq_epsilon = 25.5918 ** 2 | |
self.V = self.get_V_matrix() | |
self.H = self.get_H_matrix() | |
self.df = self.get_approximate_df() | |
def zero_coefficients(self): | |
self.beta = tf.Variable(tf.zeros([2, 1], dtype=tf.float64)) | |
self.b = tf.Variable(tf.zeros([36, 1], dtype=tf.float64)) | |
def reset_variances(self, Sigma_b, sigmasq_epsilon): | |
self.Sigma_b = Sigma_b | |
self.sigmasq_epsilon = sigmasq_epsilon | |
# Update matrices that depend on these variances | |
self.V = self.get_V_matrix() | |
self.H = self.get_H_matrix() | |
self.df = self.get_approximate_df() | |
def get_sleepstudy_data(self, sleepdata_path): | |
sleep_df = pd.read_csv(sleepdata_path) | |
sleep_df['SubjectId'] = pd.Categorical(sleep_df['Subject']).codes | |
sleep_df['SubjectId'] = sleep_df.SubjectId.apply(lambda x: str(x)) | |
return sleep_df | |
def get_X_matrix(self): | |
return patsy.dmatrix('1 + Days', data=self.data) | |
def get_Z_matrix(self): | |
"""Gets textbook (block diagonal) random effects Z matrix.""" | |
Z_int = patsy.dmatrix('0 + SubjectId', data=self.data) | |
Z_slope = patsy.dmatrix('0 + SubjectId:Days', data=self.data) | |
Z_version1 = np.concatenate((Z_int, Z_slope), axis=1) | |
# Prepare a column permutation matrix | |
P = np.zeros((2 * self.N_subjects, 2 * self.N_subjects)) | |
j = 0 | |
for i in range(self.N_subjects): | |
P[i, 2 * j] = 1.0 | |
P[self.N_subjects + i, 2 * j + 1] = 1.0 | |
j += 1 | |
Z = np.matmul(Z_version1, P) # Textbook Z matrix | |
return Z | |
def get_V_matrix(self): | |
"""Get inverse random effects vector variance matrix""" | |
Sigma_b_inv = np.linalg.inv(self.Sigma_b) | |
V = np.kron(np.identity(self.N_subjects), Sigma_b_inv) | |
return V | |
def get_H_matrix(self): | |
"""Define the Hat matrix H such that y_hat = Hy.""" | |
Xt = np.transpose(self.X) | |
Zt = np.transpose(self.Z) | |
XtX_s = np.matmul(Xt, self.X) / self.sigmasq_epsilon | |
XtZ_s = np.matmul(Xt, self.Z) / self.sigmasq_epsilon | |
ZtX_s = np.matmul(Zt, self.X) / self.sigmasq_epsilon | |
ZtZ_s_plus_V = np.matmul(Zt, self.Z) / self.sigmasq_epsilon + self.V | |
X_and_Z = np.hstack((self.X, self.Z)) | |
Xt_over_Zt_s = np.vstack((Xt, Zt)) / self.sigmasq_epsilon | |
D = np.vstack((np.hstack((XtX_s, XtZ_s)), | |
np.hstack((ZtX_s, ZtZ_s_plus_V)))) | |
D_inv = np.linalg.inv(D) | |
H = np.matmul(np.matmul(X_and_Z, D_inv), Xt_over_Zt_s) | |
return H | |
def get_approximate_df(self): | |
return np.trace(self.H) | |
def get_fitted_values(self): | |
return np.matmul(self.H, self.y) | |
def get_random_effects(self): | |
b_hat = np.squeeze(self.b.numpy()) | |
rand_effs = np.reshape(b_hat, (self.N_subjects, 2)) | |
return pd.DataFrame(rand_effs, columns = ['mu', 'b']) | |
def get_rnd_effs_variance(self): | |
"""Returns variance matrix of computed random effects""" | |
rand_effs = self.get_random_effects() | |
return np.cov(rand_effs, rowvar=False) | |
def estimate_sigmasq_epsilon(self): | |
y_hat = self.get_fitted_values() | |
df = self.get_approximate_df() | |
resid = self.y - y_hat | |
est = np.matmul(resid.T, resid) / (resid.shape[0] - df) | |
return est.ravel() | |
@tf.function | |
def _get_expectation(self, X, Z, beta, b): | |
return tf.matmul(X, beta) + tf.matmul(Z, b) | |
@tf.function | |
def _get_sse(self, y, y_hat): | |
return tf.reduce_sum(tf.square(y - y_hat)) | |
@tf.function | |
def _get_neg_log_prior(self, b, V): | |
"""Get the wieght pentalty from the full Guassian distribution""" | |
bTV = tf.matmul(tf.transpose(b), V) | |
bTVb = tf.matmul(bTV, b) | |
return tf.squeeze(bTVb) | |
def set_optimizer(self, adam=False): | |
"""Choose optimizer for the model training task.""" | |
self.optimizer = (tf.optimizers.Adam() if adam else | |
tf.optimizers.SGD(learning_rate=.025, momentum=.98)) | |
def train(self, epochs=1500, display_beta=True): | |
"""Trains model using a TensorFlow training loop""" | |
X = tf.constant(self.X) | |
Z = tf.constant(self.Z) | |
y = tf.constant(self.y) | |
V = tf.constant(self.V) | |
sigmasq_int = tf.constant(self.Sigma_b[0, 0]) | |
sigmasq_slope = tf.constant(self.Sigma_b[0, 0]) | |
for epoch in range(epochs): | |
with tf.GradientTape() as gradient_tape: | |
y_pred = self._get_expectation(X, Z, self.beta, self.b) | |
loss = (self._get_sse(y, y_pred) / self.sigmasq_epsilon | |
+ self._get_neg_log_prior(self.b, V)) | |
gradient = gradient_tape.gradient(loss, ( | |
(self.trainable_variables[0], self.trainable_variables[1]) | |
)) | |
self.optimizer.apply_gradients(zip(gradient, | |
self.trainable_variables)) | |
if display_beta: | |
print(self.beta) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment