Created
September 1, 2015 13:05
-
-
Save chalup/46960f82d3870d20b7b6 to your computer and use it in GitHub Desktop.
OPTICS implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import numpy as np | |
from sklearn.neighbors import BallTree | |
class Optics: | |
def __init__(self, eps, min_pts): | |
self.eps = eps | |
self.min_pts = min_pts | |
def get_reachability_plot(self, data): | |
n = data.shape[0] | |
assert n >= self.min_pts, "Input data should have at least min_pts (%) samples".format(self.min_pts) | |
ordering = [] | |
core_distances = np.ones(n) * np.nan | |
reachability_distances = np.ones(n) * np.inf | |
seeds = np.ones(n, dtype=bool) | |
ball_tree = BallTree(data, p=1) | |
# calculate core distance for each core point | |
neighbors_count = ball_tree.query_radius(data, r=self.eps, count_only=True) | |
core_points_mask = neighbors_count >= self.min_pts | |
assert any(core_points_mask), "Input data contains no core points for specified eps and min_pts" | |
core_distances[core_points_mask] = ball_tree.query(data[core_points_mask], k=self.min_pts)[0][:, -1] | |
# start from core point | |
i = core_points_mask.nonzero()[0][0] | |
while True: | |
seeds[i] = False | |
ordering.append(i) | |
core_distance = core_distances[i] | |
if not math.isnan(core_distance): | |
assert core_distance <= self.eps | |
distances, indices = ball_tree.query(data[i], k=neighbors_count[i]) | |
seeds_mask = seeds[indices[0]] | |
neighbors = indices[0][seeds_mask] | |
new_reachability_distances = np.minimum( | |
reachability_distances[neighbors], | |
np.maximum(core_distance, distances[0][seeds_mask]) | |
) | |
reachability_distances[neighbors] = new_reachability_distances | |
# continue with next seed point with reachability distance | |
i = seeds.nonzero()[0][np.argmin(reachability_distances[seeds])] | |
if not math.isinf(reachability_distances[i]): | |
continue | |
# if that's not possible, use next unprocessed core point | |
unprocessed_core_points = seeds * core_points_mask | |
if any(unprocessed_core_points): | |
i = unprocessed_core_points.nonzero()[0][0] | |
continue | |
# done | |
break | |
# add all remaining points to ordering | |
ordering += seeds.nonzero()[0].tolist() | |
assert len(ordering) == n | |
return ordering, reachability_distances |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment