Last active
February 22, 2024 08:48
-
-
Save STARasGAMES/979d5b6329f673401643eccaaa643094 to your computer and use it in GitHub Desktop.
Very messy and not 100% correct ISODATA clustering algorithm implementation on Python that works with vectors. Source: https://github.com/PyRadar/pyradar/blob/master/pyradar/classifiers/isodata.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Very messy and not 100% correct ISODATA clustering algorithm implementation that works with vectors. | |
# Source: https://github.com/PyRadar/pyradar/blob/master/pyradar/classifiers/isodata.py | |
# it works only for one-dimensional vectors :-( | |
# P.S. | |
# I've made it for university lab that's why you can find test values at the end of the file. (and that's why code so flipping bad XD) | |
# And I was shocked that there is no complete implementation of this algorithm on the internet o_O. | |
# Hope this helped:3 | |
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright 2012 - 2013 | |
# Matías Herranz <[email protected]> | |
# Joaquín Tita <[email protected]> | |
# | |
# https://github.com/PyRadar/pyradar | |
# | |
# This library is free software; you can redistribute it and/or | |
# modify it under the terms of the GNU Lesser General Public | |
# License as published by the Free Software Foundation; either | |
# version 3 of the License, or (at your option) any later version. | |
# | |
# This library is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
# Lesser General Public License for more details. | |
# | |
# You should have received a copy of the GNU Lesser General Public | |
# License along with this library. If not, see <http://www.gnu.org/licenses/>. | |
import numpy as np | |
from scipy.cluster import vq | |
#from pyradar.utils import take_snapshot | |
def initialize_parameters(parameters=None): | |
"""Auxiliar function to set default values to all the parameters not | |
given a value by the user. | |
""" | |
parameters = {} if not parameters else parameters | |
def safe_pull_value(parameters, key, default): | |
return parameters.get(key, default) | |
# number of clusters desired | |
K = safe_pull_value(parameters, 'K', 5) | |
# maximum number of iterations | |
I = safe_pull_value(parameters, 'I', 100) | |
# maximum of number of pairs of clusters which can be merged | |
P = safe_pull_value(parameters, 'P', 4) | |
# threshold value for minimum number of samples in each cluster | |
# (discarding clusters) | |
THETA_M = safe_pull_value(parameters, 'THETA_M', 10) | |
# threshold value for standard deviation (for split) | |
THETA_S = safe_pull_value(parameters, 'THETA_S', 1) | |
# threshold value for pairwise distances (for merge) | |
THETA_C = safe_pull_value(parameters, 'THETA_C', 20) | |
# percentage of change in clusters between each iteration | |
#(to stop algorithm) | |
THETA_O = 0.01 | |
#can use any of both fixed or random | |
# number of starting clusters | |
#k = np.random.randint(1, K) | |
k = safe_pull_value(parameters, 'k', K) | |
ret = locals() | |
ret.pop('safe_pull_value') | |
ret.pop('parameters') | |
globals().update(ret) | |
def quit_low_change_in_clusters(centers, last_centers, iter): | |
"""Stop algorithm by low change in the clusters values between each | |
iteration. | |
:returns: True if should stop, otherwise False. | |
""" | |
quit = False | |
if centers.shape == last_centers.shape: | |
thresholds = np.abs((centers - last_centers) / (last_centers + 1)) #todo calculate distance between points not between axis | |
if np.all(thresholds <= THETA_O): # percent of change in [0:1] | |
quit = True | |
print("Isodata(info): Stopped by low threshold at the centers.") | |
print("Iteration step: %s" % iter) | |
return quit | |
def merge_clusters(img_class_flat, centers, clusters_list): | |
""" | |
Merge by pair of clusters in 'below_threshold' to form new clusters. | |
""" | |
pair_dists = compute_pairwise_distances(centers) | |
first_p_elements = pair_dists[:P] | |
below_threshold = [(c1, c2) for d, (c1, c2) in first_p_elements | |
if d < THETA_C] | |
if below_threshold: | |
k = centers.shape[0] | |
count_per_cluster = np.zeros(k) | |
to_add = np.empty((0,centers.shape[1]), float) # new clusters to add | |
to_delete = np.array([]) # clusters to delete | |
for cluster in range(0, k): | |
result = np.where(img_class_flat == clusters_list[cluster]) | |
indices = result[0] | |
count_per_cluster[cluster] = indices.size | |
for c1, c2 in below_threshold: | |
c1_count = float(count_per_cluster[c1]) + 1 | |
c2_count = float(count_per_cluster[c2]) | |
factor = 1.0 / (c1_count + c2_count) | |
weight_c1 = c1_count * centers[c1] | |
weight_c2 = c2_count * centers[c2] | |
value = factor * (weight_c1 + weight_c2) | |
to_add = np.append(to_add, [value], axis=0) #todo check | |
to_delete = np.append(to_delete, [c1, c2]) | |
#delete old clusters and their indices from the availables array | |
centers = np.delete(centers, to_delete, axis=0) | |
clusters_list = np.delete(clusters_list, to_delete) | |
#generate new indices for the new clusters | |
#starting from the max index 'to_add.size' times | |
start = int(clusters_list.max())+1 | |
end = to_add.shape[0] + start | |
centers = np.append(centers, to_add, axis=0) #todo check | |
clusters_list = np.append(clusters_list, range(start, end)) | |
centers, clusters_list = sort_arrays_by_first(centers, clusters_list) | |
return centers, clusters_list | |
def compute_pairwise_distances(centers): | |
""" | |
Compute the pairwise distances 'pair_dists', between every two clusters | |
centers and returns them sorted. | |
Returns: | |
- a list with tuples, where every tuple has in it's first coord the | |
distance between to clusters, and in the second coord has a tuple, | |
with the numbers of the clusters measured. | |
Output example: | |
[(d1,(cluster_1,cluster_2)), | |
(d2,(cluster_3,cluster_4)), | |
... | |
(dn, (cluster_n,cluster_n+1))] | |
""" | |
pair_dists = [] | |
size = centers.shape[0] | |
for i in range(0, size): | |
for j in range(0, size): | |
if i > j: | |
#d = np.abs(centers[i] - centers[j]) | |
d = np.linalg.norm(centers[i] - centers[j]) | |
pair_dists.append((d, (i, j))) | |
#return it sorted on the first elem | |
res = sorted(pair_dists) #todo check | |
return res | |
def split_clusters(img_flat, img_class_flat, centers, clusters_list): | |
""" | |
Split clusters to form new clusters. | |
""" | |
assert centers.shape[0] == clusters_list.size, \ | |
"ERROR: split() centers and clusters_list size are different" | |
delta = 1 | |
k = centers.shape[0] | |
count_per_cluster = np.zeros(k) | |
stddev = np.array([]) | |
avg_dists_to_clusters = compute_avg_distance(img_flat, img_class_flat, | |
centers, clusters_list) | |
d = compute_overall_distance(img_class_flat, avg_dists_to_clusters, | |
clusters_list) | |
# compute all the standard deviation of the clusters | |
for cluster in range(0, k): | |
indices = np.where(img_class_flat == clusters_list[cluster])[0] | |
count_per_cluster[cluster] = indices.size | |
if count_per_cluster[cluster] < 1: | |
pass | |
points = img_flat[indices] | |
cluster_center = centers[cluster] | |
vectors = points - cluster_center | |
magnitudes = (vectors * vectors).sum(axis=1) ** 0.5 | |
value = (magnitudes).sum() #todo fix this formula | |
value /= count_per_cluster[cluster] | |
stddev = np.append(stddev, value) | |
cluster = stddev.argmax() | |
max_stddev = stddev[cluster] | |
max_clusters_list = int(clusters_list.max()) | |
c = 1 # coeficient of proportion 0 < c < 1 (idk what it is and what should be) | |
delta = max_stddev * c | |
if max_stddev > THETA_S: | |
if avg_dists_to_clusters[cluster] >= d: | |
if count_per_cluster[cluster] >= (2.0 * THETA_M): | |
old_cluster = centers[cluster] | |
new_cluster_1 = old_cluster + delta | |
new_cluster_2 = old_cluster - delta | |
centers = np.delete(centers, cluster, axis=0) | |
clusters_list = np.delete(clusters_list, cluster) | |
centers = np.append(centers, [new_cluster_1], axis=0) | |
centers = np.append(centers, [new_cluster_2], axis=0) | |
clusters_list = np.append(clusters_list, [cluster, | |
(max_clusters_list + 1)]) | |
centers, clusters_list = sort_arrays_by_first(centers, | |
clusters_list) | |
assert centers.shape[0] == clusters_list.size, \ | |
"ERROR: split() centers and clusters_list size are different" | |
return centers, clusters_list | |
def compute_overall_distance(img_class_flat, avg_dists_to_clusters, | |
clusters_list): | |
""" | |
Computes the overall distance of the samples from their respective cluster | |
centers. | |
""" | |
k = avg_dists_to_clusters.size | |
total = img_class_flat.size | |
count_per_cluster = np.zeros(k) | |
for cluster in range(0, k): | |
indices = np.where(img_class_flat == clusters_list[cluster])[0] | |
count_per_cluster[cluster] = indices.size | |
d = ((count_per_cluster / total) * avg_dists_to_clusters).sum() | |
return d | |
def compute_avg_distance(img_flat, img_class_flat, centers, clusters_list): | |
""" | |
Computes all the average distances to the center in each cluster. | |
""" | |
k = centers.shape[0] | |
avg_dists_to_clusters = np.array([]) | |
for cluster in range(0, k): | |
indices = np.where(img_class_flat == clusters_list[cluster])[0] | |
total_per_cluster = indices.size + 1 | |
sum_per_cluster = 0 | |
points = img_flat[indices] | |
for point in points: | |
sum_per_cluster = sum_per_cluster + np.linalg.norm(point - centers[cluster]) | |
#sum_per_cluster = (np.abs(img_flat[indices] - centers[cluster])).sum() | |
##todo euclidean distance | |
dj = (sum_per_cluster / float(total_per_cluster)) | |
avg_dists_to_clusters = np.append(avg_dists_to_clusters, dj) | |
return avg_dists_to_clusters | |
def discard_clusters(img_class_flat, centers, clusters_list): | |
""" | |
Discard clusters with fewer than THETA_M. | |
""" | |
k = centers.shape[0] | |
to_delete = np.array([]) | |
assert k == clusters_list.size, \ | |
"ERROR: discard_cluster() centers and clusters_list size are different" | |
for cluster in range(0, k): | |
indices = np.where(img_class_flat == clusters_list[cluster])[0] | |
total_per_cluster = indices.size | |
if total_per_cluster < THETA_M: | |
to_delete = np.append(to_delete, cluster) | |
if to_delete.size: | |
new_centers = np.delete(centers, to_delete, axis=0) | |
new_clusters_list = np.delete(clusters_list, to_delete) | |
else: | |
new_centers = centers | |
new_clusters_list = clusters_list | |
new_centers, new_clusters_list = sort_arrays_by_first(new_centers, | |
new_clusters_list) | |
# shape_bef = centers.shape[0] | |
# shape_aft = new_centers.shape[0] | |
# print "Isodata(info): Discarded %s clusters." % (shape_bef - | |
# shape_aft) | |
# if to_delete.size: | |
# print "Clusters discarded %s" % to_delete | |
assert new_centers.shape[0] == new_clusters_list.size, \ | |
"ERROR: discard_cluster() centers and clusters_list size are different" | |
return new_centers, new_clusters_list | |
def update_clusters(img_flat, img_class_flat, centers, clusters_list): | |
""" Update clusters. """ | |
k = centers.shape[0] | |
new_centers = []#np.array([]) | |
new_clusters_list = np.array([]) | |
assert centers.shape[0] == clusters_list.size, \ | |
"ERROR: update_clusters() centers and clusters_list size are different" | |
for cluster in range(0, k): | |
indices = np.where(img_class_flat == clusters_list[cluster])[0] | |
#get whole cluster | |
cluster_values = img_flat[indices] | |
#sum and count the values | |
sum_per_cluster = cluster_values.sum(axis = 0) | |
total_per_cluster = (cluster_values.shape[0]) + 1 | |
#compute the new center of the cluster | |
new_cluster = sum_per_cluster / total_per_cluster | |
new_centers.append(new_cluster) | |
#new_centers = np.concatenate(([new_centers], [new_cluster]), axis=0) | |
new_clusters_list = np.append(new_clusters_list, cluster) | |
new_centers = np.asarray(new_centers)# | |
new_centers, new_clusters_list = sort_arrays_by_first(new_centers, | |
new_clusters_list) | |
assert new_centers.shape[0] == new_clusters_list.size, \ | |
"ERROR: update_clusters() centers and clusters_list size are different" | |
return new_centers, new_clusters_list | |
def initial_clusters(img_flat, k, method="linspace"): | |
""" | |
Define initial clusters centers as startup. | |
By default, the method is "linspace". Other method available is "random". | |
""" | |
methods_availables = ["linspace", "random"] | |
assert method in methods_availables, "ERROR: method %s is no valid." \ | |
"Methods availables %s" % (method, methods_availables) | |
if method == "linspace": | |
max, min = img_flat.max(), img_flat.min() | |
centers = np.linspace(min, max, k) | |
elif method == "random": | |
start, end = 0, img_flat.shape[0] | |
indices = np.random.randint(start, end, k) | |
centers = img_flat.take(indices, axis=0) | |
return centers | |
def sort_arrays_by_first(centers, clusters_list): | |
""" | |
Sort the array 'centers' and the with indices of the sorted centers | |
order the array 'clusters_list'. | |
Example: centers=[22, 33, 0, 11] and cluster_list=[7,6,5,4] | |
returns (array([ 0, 11, 22, 33]), array([5, 4, 7, 6])) | |
""" | |
assert centers.shape[0] == clusters_list.size, \ | |
"ERROR: sort_arrays_by_first centers and clusters_list size are not equal" | |
# return as it is because we don't really need to sort this thing? or no? | |
return centers, clusters_list | |
indices = np.argsort(centers) | |
sorted_centers = centers[indices] | |
sorted_clusters_list = clusters_list[indices] | |
return sorted_centers, sorted_clusters_list | |
def isodata_classification(img, parameters=None, initial_centers=None): | |
""" | |
Classify a numpy 'img' using Isodata algorithm. | |
Parameters: a dictionary with the following keys. | |
- img: an input numpy array that contains the image to classify. | |
- parameters: a dictionary with the initial values. | |
If 'parameters' are not specified, the algorithm uses the default | |
ones. | |
+ number of clusters desired. | |
K = 15 | |
+ max number of iterations. | |
I = 100 | |
+ max number of pairs of clusters which can be ,erged. | |
P = 2 | |
+ threshold value for min number in each cluster. | |
THETA_M = 10 | |
+ threshold value for standard deviation (for split). | |
THETA_S = 0.1 | |
+ threshold value for pairwise distances (for merge). | |
THETA_C = 2 | |
+ threshold change in the clusters between each iter. | |
THETA_O = 0.01 | |
Note: if some(or all) parameters are nos providen, default values | |
will be used. | |
Returns: | |
- img_class: a numpy array with the classification. | |
- centers: a numpy array with centers of the clusters | |
""" | |
global K, I, P, THETA_M, THETA_S, THEHTA_C, THETA_O, k | |
initialize_parameters(parameters) | |
np.random.seed(0) | |
N, M = img.shape # for reshaping at the end | |
img_flat = np.array(img)#img.flatten() | |
if (initial_centers != None): | |
centers = np.array(initial_centers) | |
else: | |
centers = initial_clusters(img_flat, k, "random") | |
k = centers.shape[0] | |
print("Isodata(info): Starting algorithm with %s classes" % k) | |
clusters_list = np.arange(k) # number of clusters availables | |
for iter in range(0, I): | |
print ("Isodata(info): Iteration:%s Num Clusters:%s %s" % (iter, k, centers)) | |
last_centers = centers.copy() | |
# assing each of the samples to the closest cluster center | |
img_class_flat, dists = vq.vq(img_flat, centers) | |
centers, clusters_list = discard_clusters(img_class_flat, centers, clusters_list) | |
img_class_flat, dists = vq.vq(img_flat, centers) | |
centers, clusters_list = update_clusters(img_flat, img_class_flat, centers, clusters_list) | |
k = centers.shape[0] | |
if k < K: # too few clusters => split clusters (previously there was K/2) | |
centers, clusters_list = split_clusters(img_flat, img_class_flat, centers, clusters_list) | |
elif k > K: # too many clusters => merge clusters (previously there was K/2) | |
centers, clusters_list = merge_clusters(img_class_flat, centers, clusters_list) | |
else: # nor split or merge are needed | |
pass | |
k = centers.shape[0] | |
############################################################################### | |
if quit_low_change_in_clusters(centers, last_centers, iter): | |
break | |
# take_snapshot(img_class_flat.reshape(N, M), iteration_step=iter) | |
############################################################################### | |
print("Isodata(info): Finished with %s classes" % k) | |
print("Isodata(info): Number of Iterations: %s" % (iter + 1)) | |
img_class_flat, dists = vq.vq(img_flat, centers) | |
return img_class_flat, centers | |
def run_test(img, params, centers): | |
classes, centers = isodata_classification(img, parameters=params, initial_centers=centers) | |
print("Classes: ", classes) | |
print("Centers: ", centers) | |
print("Details:") | |
for point in range(0,img.shape[0]): | |
print(img[point], " belongs to ", centers[classes[point]]) | |
""" | |
+ number of clusters desired. | |
K = 15 | |
+ max number of iterations. | |
I = 100 | |
+ max number of pairs of clusters which can be merged. | |
P = 2 | |
+ threshold value for min number in each cluster. | |
THETA_M = 10 | |
+ threshold value for standard deviation (for split). | |
THETA_S = 0.1 | |
+ threshold value for pairwise distances (for merge). | |
THETA_C = 2 | |
+ threshold change in the clusters between each iter. | |
THETA_O = 0.01 | |
""" | |
#Test 1.1 | |
print("=============== TEST-1.1 ===============") | |
img = np.array([[0,0], [1,1], [2,2], [4,3], [5,3], [4,4], [5,4], [6,5]]) | |
centers = [[0,0]] | |
params = { | |
"K": 2, | |
"THETA_M": 1, | |
"THETA_S": 1, | |
"THETA_C": 4 | |
} | |
#run_test(img, params, centers) | |
#Test 1.2 | |
print("=============== TEST-1.2 ===============") | |
centers = [[0,0],[1,1]] | |
params = { | |
"K": 3, | |
"THETA_M": 2, | |
"THETA_S": 0.8, | |
"THETA_C": 3 | |
} | |
#run_test(img, params, centers) | |
#Test 1.3 | |
print("=============== TEST-1.3 ===============") | |
centers = [[0,0],[4,4],[6,5]] | |
params = { | |
"K": 4, | |
"THETA_M": 2, | |
"THETA_S": 0.5, | |
"THETA_C": 2 | |
} | |
#run_test(img, params, centers) | |
#Test 2.1 | |
print("=============== TEST-2.1 ===============") | |
img = np.array([[0,0], [1,0], [0,1], [1,1], [2,1], [1,2], [3,2], [1,7], [0,7], [0,8], [1,8], [0,9], [2,8], [2,9], [6,6], [7,6], [8,6], [7,7], [8,8], [9,9]]) | |
centers = [[0,0]] | |
params = { | |
"K": 3, | |
"THETA_M": 2, | |
"THETA_S": 1, | |
"THETA_C": 4 | |
} | |
#run_test(img, params, centers) | |
#Test 2.2 | |
print("=============== TEST-2.2 ===============") | |
centers = [[0,0],[1,0],[0,1]] | |
params = { | |
"K": 4, | |
"THETA_M": 1, | |
"THETA_S": 0.5, | |
"THETA_C": 3 | |
} | |
#run_test(img, params, centers) | |
#Test 2.3 | |
print("=============== TEST-2.3 ===============") | |
centers = [[0,0],[9,9],[0,8],[6,6]] | |
params = { | |
"K": 2, | |
"THETA_M": 4, | |
"THETA_S": 1.2, | |
"THETA_C": 5 | |
} | |
run_test(img, params, centers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment