Last active
June 27, 2022 13:26
-
-
Save mparker2/8c6dff1f8d354b1c2fd58d87c351e1fe to your computer and use it in GitHub Desktop.
some numpy functions for dynamic time warping
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from scipy.spatial.distance import cdist | |
from .dtw import dtw | |
from .lb import envelope, lb_keogh_cdist, lb_keogh_from_bounds | |
from .window import window_ts | |
def dtw_nearest_neighbours(query, db, bound_reach, step_size, subseq=False): | |
assert query.ndim == 1 | |
assert db.ndim == 2 | |
assert query.flags.contiguous | |
query_lb, query_ub = envelope(query, bound_reach) | |
query_len = query.shape[0] | |
db_len = db.shape[1] | |
if query_len > db_len: | |
windows = window_ts(query, size=db_len, step_size=step_size) | |
envelope_windows = window_ts(np.c_[query_lb, query_ub], | |
size=db_len, | |
step_size=step_size, | |
axis=0).transpose(0, 2, 1) | |
elif query_len == db_len: | |
windows = query.reshape(1, -1) | |
envelope_windows = np.expand_dims(np.c_[query_lb, query_ub].T, 0) | |
else: | |
raise NotImplementedError() | |
lb_scores = lb_keogh_cdist(envelope_windows, db).min(0) | |
best_lb_idx = np.argsort(lb_scores) | |
best_dtw = np.inf | |
nearest_neighbour = None | |
nn_idx = None | |
for idx in best_lb_idx: | |
if lb_scores[idx] < best_dtw: | |
candidate = db[idx] | |
dtw_score = dtw(candidate, query, subseq=subseq) | |
if dtw_score < best_dtw: | |
best_dtw = dtw_score | |
nearest_neighbour = candidate | |
nn_idx = idx | |
return nearest_neighbour, best_dtw, nn_idx | |
### DTW Kmeans ++ initialisation: | |
# adapted from https://github.com/scikit-learn/scikit-learn/blob/da71b827b8b56bd8305b7fe6c13724c7b5355209/sklearn/cluster/k_means_.py#L44 | |
def kmeans_plus_plus(X, n_clusters, metric='dtw', bound_reach=5): | |
n_samples, n_features = X.shape | |
if metric == 'dtw': | |
def metric(X, Y): | |
_metric = partial(dtw, subseq=False, backtrack=False, return_matrix=False) | |
return cdist(X, Y, metric=_metric) | |
elif metric == 'dtw_subseq': | |
def metric(X, Y): | |
_metric = partial(dtw, subseq=True, backtrack=False, return_matrix=False) | |
return cdist(X, Y, metric=_metric) | |
# dtw km++ is very slow so a rougher but faster approach is lb_keogh | |
elif metric == 'lb_keogh': | |
def metric(X, Y): | |
n_X = X.shape[0] | |
n_Y = Y.shape[0] | |
X_envelope = np.array([envelope(x, bound_reach) for x in X]) | |
# we cannot use scipy cdist here as X_envelope is 3 dimensional (2nd dim is 2: lb & ub) | |
# use numpy to hopefully get something approaching the speed of cdist | |
dists = np.empty(shape=(n_X, n_Y)) | |
ii, jj = np.mgrid[:n_X, :n_Y] | |
for i, j in np.c_[ii.ravel(), jj.ravel()]: | |
dists[i, j] = lb_keogh_from_bounds( | |
Y[j], X_envelope[i, 0], X_envelope[i, 1]) | |
return dists | |
centers = np.empty((n_clusters, n_features), dtype=X.dtype) | |
n_local_trials = 2 + int(np.log(n_clusters)) | |
# Pick first center randomly | |
center_id = np.random.randint(n_samples) | |
centers[0] = X[center_id] | |
# Initialize list of closest distances and calculate current potential | |
dists = metric(centers[0, np.newaxis], X) | |
potential = dists.sum() | |
# Pick the remaining n_clusters-1 points | |
for c in range(1, n_clusters): | |
# Choose center candidates by sampling with probability proportional | |
# to the squared distance to the closest existing center | |
rand_vals = np.random.random_sample(n_local_trials) * potential | |
candidate_ids = np.searchsorted(np.cumsum(dists), rand_vals) | |
# Compute distances to center candidates | |
candidate_dists = metric(X[candidate_ids], X) | |
# Decide which candidate is the best | |
best_candidate = None | |
best_potential = None | |
best_dists = None | |
for candidate_id in range(n_local_trials): | |
# Compute potential when including center candidate | |
new_dists = np.minimum(dists, candidate_dists[candidate_id]) | |
new_potential = new_dists.sum() | |
# Store result if it is the best local trial so far | |
if (best_candidate is None) or (new_potential < best_potential): | |
best_candidate = candidate_ids[candidate_id] | |
best_potential = new_potential | |
best_dists = new_dists | |
# Permanently add best center candidate found in local tries | |
centers[c] = X[best_candidate] | |
potential = best_potential | |
dists = best_dists | |
return centers |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from scipy.spatial.distance import pdist, squareform | |
from .dtw import dtw | |
def medoid(X): | |
dists = squareform(pdist(X)) | |
dist_sum = dists.sum(0) | |
m = X[dist_sum.argmin()] | |
return m | |
def _update_dba(t, X): | |
t_update = np.zeros_like(t) | |
t_count = np.zeros_like(t) | |
for x in X: | |
_, path = dtw(t, x, backtrack=True) | |
for i, j in path: | |
t_update[i] += x[j] | |
t_count[i] += 1 | |
t_update /= t_count | |
return np.asarray(t_update) | |
def dba(X, it=20, tol=0.05): | |
t = medoid(X) | |
for _ in range(it): | |
t_update = _update_dba(t, X) | |
if np.allclose(t, t_update, 0.05): | |
return t_update | |
else: | |
t = t_update | |
return t |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from scipy.spatial.distance import cdist | |
def accum_cost_and_path_matrix(cost, subseq): | |
ij = np.array(cost.shape) + 1 | |
accum_cost = np.full(ij, np.inf, dtype=cost.dtype) | |
accum_cost[0, 0] = 0 | |
if subseq: | |
accum_cost[0, :] = 0 | |
path_matrix = np.zeros(ij, dtype='i') | |
ii, jj = np.mgrid[1: ij[0], 1: ij[1]] | |
for i, j in np.c_[ii.ravel(), jj.ravel()]: | |
pos_cost = cost[i - 1, j - 1] | |
steps = [[i - 1, i, i - 1], [j - 1, j - 1, j]] | |
step_cost = accum_cost[steps] | |
step_type = np.argmin(step_cost) | |
path_matrix[i, j] = step_type | |
accum_cost[i, j] = pos_cost + step_cost[step_type] | |
return accum_cost[1:, 1:], path_matrix[1:, 1:] | |
def backtrack_path_matrix(path_matrix, min_idx=None): | |
idx = np.array(path_matrix.shape) - 1 | |
if min_idx is not None: | |
idx[1] = min_idx | |
step_types = np.array([[-1, -1], [0, -1], [-1, 0]]) | |
path = [idx.copy(), ] | |
while np.all(idx != 0): | |
idx += step_types[path_matrix[idx[0], idx[1]]] | |
path.append(idx.copy()) | |
return np.asarray(path) | |
def dtw(X, Y, | |
metric='cityblock', | |
backtrack=False, | |
return_matrix=False, | |
subseq=False): | |
cost = cdist(X.reshape(-1, 1), | |
Y.reshape(-1, 1), | |
metric=metric) | |
accum_cost, path_matrix = accum_cost_and_path_matrix(cost, subseq) | |
if subseq: | |
min_idx = np.argmin(accum_cost[-1, :]) | |
else: | |
min_idx = path_matrix.shape[1] - 1 | |
if not backtrack: | |
if return_matrix: | |
return accum_cost | |
else: | |
return accum_cost[-1, min_idx] | |
else: | |
path = backtrack_path_matrix(path_matrix, min_idx) | |
if return_matrix: | |
return accum_cost, path | |
else: | |
return accum_cost[-1, min_idx], path | |
def dtw_dist(X1, X2=None, subseq=False): | |
def _dtw(x, y): | |
cost = dtw(x, y, | |
backtrack=False, | |
return_matrix=False, | |
subseq=subseq) | |
if X2 is None: | |
dists = squareform(pdist(X1, metric=_dtw)) | |
else: | |
dists = cdist(X1, X2, metric=_dtw) | |
return dists |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from scipy import ndimage as ndi | |
def envelope(X, bound_reach): | |
lb = ndi.minimum_filter1d(X, bound_reach, axis=0, mode='reflect') | |
ub = ndi.maximum_filter1d(X, bound_reach, axis=0, mode='reflect') | |
return lb, ub | |
def lb_keogh_from_bounds(Y, lb, ub): | |
scores = np.select( | |
condlist=[Y < lb, Y > ub], | |
choicelist=[lb - Y, Y - ub], | |
default=0 | |
) | |
return scores.sum() | |
def lb_keogh(X, Y, bound_reach): | |
lb, ub = envelope(X, bound_reach) | |
return lb_keogh_from_bounds(Y, lb, ub) | |
def lb_improved(X, Y, bound_reach): | |
lb, ub = envelope(X, bound_reach) | |
lb_k = lb_keogh_from_bounds(Y, lb, ub) | |
x_dash = np.clip(Y, lb, ub) | |
xd_lb, xd_ub = envelope(x_dash, bound_reach) | |
lb_i = lb_k + lb_keogh_from_bounds(X, xd_lb, xd_ub) | |
return lb_i | |
def lb_keogh_cdist(envelope_windows, db): | |
n_windows = envelope_windows.shape[0] | |
n_entries = db.shape[0] | |
results = np.empty(shape=(n_windows, n_entries)) | |
ii, jj = np.mgrid[:n_windows, :n_entries] | |
for i, j in np.c_[ii.ravel(), jj.ravel()]: | |
results[i, j] = lb_keogh_from_bounds( | |
db[j], envelope_windows[i, 0], envelope_windows[i, 1]) | |
return results |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
def window_ts(X, size, step_size=1, axis=None): | |
if axis is None: | |
axis = X.ndim - 1 | |
n_windows = (X.shape[axis] - size) // step_size + 1 | |
shape = X.shape[:axis] + (n_windows, size, ) + X.shape[axis + 1:] | |
new_strides = (X.strides[axis] * step_size, X.strides[axis]) | |
strides = (X.strides[:axis] + new_strides + X.strides[axis + 1:]) | |
return np.lib.stride_tricks.as_strided(X, shape=shape, strides=strides) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment