-
Notifications
You must be signed in to change notification settings - Fork 0
/
game_theoretic_clusterization.py
189 lines (152 loc) · 8.5 KB
/
game_theoretic_clusterization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import numpy as np
from scipy.sparse import lil_array, csr_array
import cv2
from utils import actualsize
import psutil
class GameTheoreticClusterization:
def __init__(self, image_path, rep_dyn_t_max, remove_small_clust_att=10, cluster_size_thresh_perc=0.01,
given_n_final_clusters=None, sigma=1, sigma_dist=1, max_iter=100, use_measure_memory_usage=False,
load_image_at_start=True, img_to_8bit_gr=True):
self.image_path = image_path
self.image = None
self.sigma = sigma
self.sigma_dist = sigma_dist
self.sim_matrix = None
self.rep_dyn_t_max = rep_dyn_t_max
self.indices_vec = None
self.prob_in_time = None
self.max_iter = max_iter
self.final_seg = None
self.org_image_dtype = None
self.cluster_size_thresh_perc = cluster_size_thresh_perc
self.remove_small_clust_att = remove_small_clust_att
self.given_n_final_clusters = given_n_final_clusters
self.sim_matrix_size_in_memory = None
self.sim_matrix_size_in_memory_if_dense = None
self.all_memory_used = None
self.use_measure_memory_usage = use_measure_memory_usage
self.cluster_size_thresh_too_big = False
self.final_cluster_count = 0
self.large_cluster_elements_thresh = 0
self.img_to_8bit_gr = img_to_8bit_gr
if load_image_at_start:
self.load_image()
def measure_memory_usage(self):
div_val = 1024 * 1024
self.sim_matrix_size_in_memory = actualsize(self.sim_matrix) / div_val
self.sim_matrix_size_in_memory_if_dense = (((self.image.shape[0] * self.image.shape[1]) ** 2) * 8) / div_val
self.all_memory_used = psutil.Process().memory_info().rss / div_val
def load_image(self):
if self.img_to_8bit_gr:
image = cv2.imread(self.image_path, 0)
else:
image = cv2.imread(self.image_path, cv2.IMREAD_ANYDEPTH)
self.org_image_dtype = image.dtype
self.image = np.array(image, dtype=np.float64)
def generate_similarity_matrix(self):
self.sigma = np.float64(self.sigma)
self.sigma_dist = np.float64(self.sigma_dist)
sim_matrix = lil_array((self.image.shape[0] * self.image.shape[1], self.image.shape[0] * self.image.shape[1]),
dtype=np.float64)
ind_mat = np.indices(self.image.shape, dtype=np.float64)
max_image_val = np.float64(np.iinfo(self.org_image_dtype).max)
max_dist = np.float64(np.sqrt(np.square(self.image.shape[0] - 1) + np.square(self.image.shape[1] - 1)))
k = 0
for i in range(self.image.shape[0]):
for j in range(self.image.shape[1]):
intensity_term = self.image[i, j] - self.image
intensity_term = np.exp((np.square(intensity_term) * (-1)) / (np.square(self.sigma)))
distance_term = np.sqrt(np.square(i - ind_mat[0]) + np.square(j - ind_mat[1]))
distance_term = (distance_term / max_dist) * max_image_val
distance_term = np.exp((np.square(distance_term) * (-1)) / (np.square(self.sigma_dist)))
combined_term = intensity_term * distance_term
combined_term = np.reshape(combined_term, (1, self.image.shape[0] * self.image.shape[1]))
combined_term = lil_array(combined_term)
sim_matrix[k, :] = combined_term
k += 1
sim_matrix = csr_array(sim_matrix, copy=False)
self.sim_matrix = sim_matrix.transpose(copy=False)
def discrete_replicator_dynamics(self, first_prob_vec):
prob_in_time = csr_array((self.rep_dyn_t_max, first_prob_vec.shape[1]), dtype=np.float64)
prob_in_time[0, :] = csr_array(first_prob_vec)
for i in range(self.rep_dyn_t_max - 1):
a = self.sim_matrix @ prob_in_time[[i], :].transpose()
b = prob_in_time[[i], :] @ a
temp_div = a / b.toarray()
prob_in_time[[i + 1], :] = prob_in_time[[i], :] * temp_div.transpose()
self.prob_in_time = prob_in_time
def clusterization(self):
q = self.image.shape[0] * self.image.shape[1]
image_vec = np.reshape(np.transpose(self.image), (1, q))
indices_vec = np.arange(stop=q, dtype=int)
print('Started generating similarity matrix...')
self.generate_similarity_matrix()
print('Finished generating similarity matrix...')
print('Started clusterization...')
if self.use_measure_memory_usage:
self.measure_memory_usage()
seg = np.zeros((1, q), dtype=np.uint8)
iter_n = 1
curr_label = 1
all_pixels_labeled = False
while not all_pixels_labeled and iter_n <= self.max_iter:
q = image_vec.shape[0] * image_vec.shape[1]
a_init = np.random.uniform(size=(1, q))
a_init = a_init / np.sum(a_init)
self.discrete_replicator_dynamics(a_init)
prob_increase_ind = self.prob_in_time[[1], :] < self.prob_in_time[[-1], :]
prob_increase_ind = prob_increase_ind.toarray()
prob_increase_ind.shape = q
prob_increase_ind_for_rm = np.logical_not(prob_increase_ind)
prob_increase_ind_for_rm.shape = q
seg[0, indices_vec[prob_increase_ind]] = curr_label
indices_vec = indices_vec[prob_increase_ind_for_rm]
image_vec = image_vec[0, prob_increase_ind_for_rm]
image_vec = np.reshape(image_vec, (1, -1))
self.sim_matrix = self.sim_matrix[:, prob_increase_ind_for_rm]
self.sim_matrix = self.sim_matrix[prob_increase_ind_for_rm, :]
if image_vec.size == 0:
all_pixels_labeled = True
iter_n += 1
curr_label += 1
self.final_seg = np.reshape(seg, self.image.shape)
print('Finished clusterization...')
self.merge_small_clusters()
def merge_small_clusters(self):
cluster_kinds, cluster_sizes = np.unique(self.final_seg, return_counts=True)
if self.given_n_final_clusters is None:
cluster_size_thresh = self.cluster_size_thresh_perc * self.final_seg.size
self.large_cluster_elements_thresh = cluster_size_thresh
small_clusters = cluster_kinds[cluster_sizes < cluster_size_thresh]
large_clusters = cluster_kinds[cluster_sizes >= cluster_size_thresh]
if large_clusters.size == 0:
largest_cluster_ind = np.argmax(cluster_sizes)
large_clusters = np.array(cluster_kinds[largest_cluster_ind], dtype=cluster_kinds.dtype)
large_clusters.shape = 1
small_clusters = cluster_kinds[cluster_kinds != large_clusters]
self.cluster_size_thresh_too_big = True
else:
if self.given_n_final_clusters <= 0 or not isinstance(self.given_n_final_clusters, int):
raise ValueError('"given_n_final_clusters" has to be bigger than 0 and "int" type')
sorted_cluster_ind = np.argsort(cluster_sizes)
large_clusters = cluster_kinds[sorted_cluster_ind[-self.given_n_final_clusters:]]
small_clusters = cluster_kinds[sorted_cluster_ind[0:(-self.given_n_final_clusters)]]
avg_colours = []
for l_cluster in large_clusters:
avg_colours.append(np.mean(self.image[self.final_seg == l_cluster], dtype=np.float64))
avg_colours = np.array(avg_colours, dtype=np.float64)
kernel = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], dtype=np.uint8)
for s_cluster in small_clusters:
cluster_seg = np.array(self.final_seg == s_cluster, dtype=np.uint8)
cluster_seg_dil = cv2.dilate(cluster_seg, kernel, iterations=1)
cluster_neighborhood = cluster_seg_dil
cluster_neighborhood[cluster_seg == cluster_neighborhood] = 0
neighbors = np.unique(self.final_seg[cluster_neighborhood])
large_neighbors, large_neighbors_inds1, large_neighbors_inds2 = np.intersect1d(neighbors, large_clusters,
return_indices=True)
mean_colour = np.mean(self.image[self.final_seg == s_cluster], dtype=np.float64)
mean_colour_diff = np.abs(avg_colours[large_neighbors_inds2] - mean_colour)
closest_cluster_ind = np.argmin(mean_colour_diff)
closest_cluster = large_neighbors[closest_cluster_ind]
self.final_seg[self.final_seg == s_cluster] = closest_cluster
self.final_cluster_count = large_clusters.size