forked from eriklindernoren/ML-From-Scratch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
partitioning_around_medoids.py
148 lines (127 loc) · 5.3 KB
/
partitioning_around_medoids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import sys
import os
import math
import random
from sklearn import datasets
import numpy as np
# Import helper functions
from mlfromscratch.utils.data_manipulation import normalize
from mlfromscratch.utils.data_operation import euclidean_distance
from mlfromscratch.unsupervised_learning import PCA
from mlfromscratch.utils import Plot
class PAM():
"""A simple clustering method that forms k clusters by first assigning
samples to the closest medoids, and then swapping medoids with non-medoid
samples if the total distance (cost) between the cluster members and their medoid
is smaller than prevoisly.
Parameters:
-----------
k: int
The number of clusters the algorithm will form.
"""
def __init__(self, k=2):
self.k = k
def _init_random_medoids(self, X):
""" Initialize the medoids as random samples """
n_samples, n_features = np.shape(X)
medoids = np.zeros((self.k, n_features))
for i in range(self.k):
medoid = X[np.random.choice(range(n_samples))]
medoids[i] = medoid
return medoids
def _closest_medoid(self, sample, medoids):
""" Return the index of the closest medoid to the sample """
closest_i = None
closest_distance = float("inf")
for i, medoid in enumerate(medoids):
distance = euclidean_distance(sample, medoid)
if distance < closest_distance:
closest_i = i
closest_distance = distance
return closest_i
def _create_clusters(self, X, medoids):
""" Assign the samples to the closest medoids to create clusters """
clusters = [[] for _ in range(self.k)]
for sample_i, sample in enumerate(X):
medoid_i = self._closest_medoid(sample, medoids)
clusters[medoid_i].append(sample_i)
return clusters
def _calculate_cost(self, X, clusters, medoids):
""" Calculate the cost (total distance between samples and their medoids) """
cost = 0
# For each cluster
for i, cluster in enumerate(clusters):
medoid = medoids[i]
for sample_i in cluster:
# Add distance between sample and medoid as cost
cost += euclidean_distance(X[sample_i], medoid)
return cost
def _get_non_medoids(self, X, medoids):
""" Returns a list of all samples that are not currently medoids """
non_medoids = []
for sample in X:
if not sample in medoids:
non_medoids.append(sample)
return non_medoids
def _get_cluster_labels(self, clusters, X):
""" Classify samples as the index of their clusters """
# One prediction for each sample
y_pred = np.zeros(np.shape(X)[0])
for cluster_i in range(len(clusters)):
cluster = clusters[cluster_i]
for sample_i in cluster:
y_pred[sample_i] = cluster_i
return y_pred
def predict(self, X):
""" Do Partitioning Around Medoids and return the cluster labels """
# Initialize medoids randomly
medoids = self._init_random_medoids(X)
# Assign samples to closest medoids
clusters = self._create_clusters(X, medoids)
# Calculate the initial cost (total distance between samples and
# corresponding medoids)
cost = self._calculate_cost(X, clusters, medoids)
# Iterate until we no longer have a cheaper cost
while True:
best_medoids = medoids
lowest_cost = cost
for medoid in medoids:
# Get all non-medoid samples
non_medoids = self._get_non_medoids(X, medoids)
# Calculate the cost when swapping medoid and samples
for sample in non_medoids:
# Swap sample with the medoid
new_medoids = medoids.copy()
new_medoids[medoids == medoid] = sample
# Assign samples to new medoids
new_clusters = self._create_clusters(X, new_medoids)
# Calculate the cost with the new set of medoids
new_cost = self._calculate_cost(
X, new_clusters, new_medoids)
# If the swap gives us a lower cost we save the medoids and cost
if new_cost < lowest_cost:
lowest_cost = new_cost
best_medoids = new_medoids
# If there was a swap that resultet in a lower cost we save the
# resulting medoids from the best swap and the new cost
if lowest_cost < cost:
cost = lowest_cost
medoids = best_medoids
# Else finished
else:
break
final_clusters = self._create_clusters(X, medoids)
# Return the samples cluster indices as labels
return self._get_cluster_labels(final_clusters, X)
def main():
# Load the dataset
X, y = datasets.make_blobs()
# Cluster the data using K-Medoids
clf = PAM(k=3)
y_pred = clf.predict(X)
# Project the data onto the 2 primary principal components
p = Plot()
p.plot_in_2d(X, y_pred, title="PAM Clustering")
p.plot_in_2d(X, y, title="Actual Clustering")
if __name__ == "__main__":
main()