-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathegsmote.py
336 lines (297 loc) · 13.5 KB
/
egsmote.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
"""Class to perform over-sampling using Geometric SMOTE."""
# Author: Georgios Douzas <gdouzas@icloud.com>
# License: BSD 3 clause
# Modified By: Aathman Tharmasanthiran <aathmant.16@cse.mrt.ac.lk>
# Kayathiri Mahendrakumaran <kayathirim.16@cse.mrt.ac.lk>
# Vivek Vinushanth Christopher <vivekc.16@cse.mrt.ac.lk>
import numpy as np
from imblearn.utils._validation import _count_class_sample
from numpy.linalg import norm
from sklearn.utils import check_random_state
from imblearn.over_sampling.base import BaseOverSampler
from imblearn.utils import check_neighbors_object, Substitution
from imblearn.utils._docstring import _random_state_docstring
import collections
from sklearn.cluster import KMeans
from yellowbrick.cluster import KElbowVisualizer
def _make_geometric_sample(
center, surface_point, truncation_factor, deformation_factor, random_state
):
"""A support function that returns an artificial point inside
the geometric region defined by the center and surface points.
Parameters
----------
center : ndarray, shape (n_features, )
Center point of the geometric region.
surface_point : ndarray, shape (n_features, )
Surface point of the geometric region.
truncation_factor : float, optional (default=0.0)
The type of truncation. The values should be in the [-1.0, 1.0] range.
deformation_factor : float, optional (default=0.0)
The type of geometry. The values should be in the [0.0, 1.0] range.
random_state : int, RandomState instance or None
Control the randomization of the algorithm.
Returns
-------
point : ndarray, shape (n_features, )
Synthetically generated sample.
"""
# Zero radius case
if np.array_equal(center, surface_point):
return center
# Generate a point on the surface of a unit hyper-sphere
radius = norm(center - surface_point)
normal_samples = random_state.normal(size=center.size)
point_on_unit_sphere = normal_samples / norm(normal_samples)
point = (random_state.uniform(size=1) ** (1 / center.size)) * point_on_unit_sphere
# Parallel unit vector
parallel_unit_vector = (surface_point - center) / norm(surface_point - center)
# Truncation
close_to_opposite_boundary = (
truncation_factor > 0
and np.dot(point, parallel_unit_vector) < truncation_factor - 1
)
close_to_boundary = (
truncation_factor < 0
and np.dot(point, parallel_unit_vector) > truncation_factor + 1
)
if close_to_opposite_boundary or close_to_boundary:
point -= 2 * np.dot(point, parallel_unit_vector) * parallel_unit_vector
# Deformation
parallel_point_position = np.dot(point, parallel_unit_vector) * parallel_unit_vector
perpendicular_point_position = point - parallel_point_position
point = (
parallel_point_position + (1 - deformation_factor) * perpendicular_point_position
)
# Translation
point = center + radius * point
return point
@Substitution(
sampling_strategy=BaseOverSampler._sampling_strategy_docstring,
random_state=_random_state_docstring,
)
class EGSmote(BaseOverSampler):
"""Class to to perform over-sampling using Geometric SMOTE.
This algorithm is an implementation of Geometric SMOTE, a geometrically
enhanced drop-in replacement for SMOTE as presented in [1]_.
Read more in the :ref:`User Guide <user_guide>`.
Parameters
----------
{sampling_strategy}
{random_state}
truncation_factor : float, optional (default=0.0)
The type of truncation. The values should be in the [-1.0, 1.0] range.
deformation_factor : float, optional (default=0.0)
The type of geometry. The values should be in the [0.0, 1.0] range.
k_neighbors : int or object, optional (default=5)
If ``int``, number of nearest neighbours to use when synthetic
samples are constructed for the minority method. If object, an estimator
that inherits from :class:`sklearn.neighbors.base.KNeighborsMixin` that
will be used to find the k_neighbors.
n_jobs : int, optional (default=1)
The number of threads to open if possible.
sampling_rate : float, optional (default=0.3)
The proportion or number of synthetic samples to be generated
Notes
-----
See the original paper: [1]_ for more details.
Supports multi-class resampling. A one-vs.-rest scheme is used as
originally proposed in [2]_.
References
----------
.. [1] G. Douzas, F. Bacao, "Geometric SMOTE:
a geometrically enhanced drop-in replacement for SMOTE",
Information Sciences, vol. 501, pp. 118-135, 2019.
.. [2] N. V. Chawla, K. W. Bowyer, L. O. Hall, W. P. Kegelmeyer, "SMOTE:
synthetic minority over-sampling technique", Journal of Artificial
Intelligence Research, vol. 16, pp. 321-357, 2002.
Examples
--------
>>> from collections import Counter
>>> from sklearn.datasets import make_classification
>>> from gsmote import EGSmote # doctest: +NORMALIZE_WHITESPACE
>>> X, y = make_classification(n_classes=2, class_sep=2,
... weights=[0.1, 0.9], n_informative=3, n_redundant=1, flip_y=0,
... n_features=20, n_clusters_per_class=1, n_samples=1000, random_state=10)
>>> print('Original dataset shape %s' % Counter(y))
Original dataset shape Counter({{1: 900, 0: 100}})
>>> gsmote = EGSmote(random_state=1)
>>> X_res, y_res = gsmote.fit_resample(X, y)
>>> print('Resampled dataset shape %s' % Counter(y_res))
Resampled dataset shape Counter({{0: 900, 1: 900}})
"""
def __init__(
self,
sampling_strategy='auto',
random_state=None,
truncation_factor=1.0,
deformation_factor=0.0,
k_neighbors=5,
n_jobs=1,
sampling_rate=0.25,
):
super(EGSmote, self).__init__(sampling_strategy=sampling_strategy)
self.random_state = random_state
self.truncation_factor = truncation_factor
self.deformation_factor = deformation_factor
self.k_neighbors = k_neighbors
self.n_jobs = n_jobs
self.sampling_rate = sampling_rate
def _validate_estimator(self):
"""Create the necessary attributes for Geometric SMOTE."""
# Check random state
self.random_state_ = check_random_state(self.random_state)
# Create nearest neighbors object for mixed class
self.nn_mix_ = check_neighbors_object('nns_mixed', self.k_neighbors)
# Create nearest neighbors of positive class
self.nns_pos_ = check_neighbors_object('nns_positive', self.k_neighbors, additional_neighbor=1)
self.nns_pos_.set_params(n_jobs=self.n_jobs)
# Create nearest neighbors of negative class
self.nn_neg_ = check_neighbors_object('nn_negative', nn_object=1)
self.nn_neg_.set_params(n_jobs=self.n_jobs)
def _make_geometric_samples(self, X, y, pos_class_label, n_samples):
"""A support function that returns an artificials samples inside
the geometric region defined by nearest neighbors.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Matrix containing the data which have to be sampled.
y : array-like, shape (n_samples, )
Corresponding label for each sample in X.
pos_class_label : str or int
The minority class (positive class) target value.
n_samples : int
The number of samples to generate.
Returns
-------
X_new : ndarray, shape (n_samples_new, n_features)
Synthetically generated samples.
y_new : ndarray, shape (n_samples_new, )
Target values for synthetic samples.
"""
# Return zero new samples
if n_samples == 0:
return (
np.array([], dtype=X.dtype).reshape(0, X.shape[1]),
np.array([], dtype=y.dtype),
)
# Select positive class samples
X_pos = X[y == pos_class_label]
# Oversampling limit for minority class
subCluster_label, sampling_limitation = self.sub_clustering(X_pos, n_samples)
# Get positive k_nearest points
self.nns_pos_.fit(X_pos)
points_pos = self.nns_pos_.kneighbors(X_pos)[1][:, 1:]
samples_indices = self.random_state_.randint(low=0, high=len(points_pos.flatten()), size=n_samples)
rows = np.floor_divide(samples_indices, points_pos.shape[1])
cols = np.mod(samples_indices, points_pos.shape[1])
# Get negative k_nearest points
X_neg = X[y != pos_class_label]
self.nn_neg_.fit(X_neg)
points_neg = self.nn_neg_.kneighbors(X_pos)[1]
# Get combined neighbours
self.nn_mix_.fit(X)
points_mix = self.nn_mix_.kneighbors(X_pos)[1]
# Generate new samples
X_new = np.zeros((n_samples, X.shape[1]))
for ind, (row, col) in enumerate(zip(rows, cols)):
created = False
while not created:
proceed = True
# Define center point
center = X_pos[row]
# count the number of k-nearest minority points
count_min = 0
for index in points_mix[row]:
if y[index] == pos_class_label:
count_min = count_min + 1
# check noisy point
if count_min != 0:
# Minority strategy
if count_min == points_mix.shape[1]:
remaining_limitation = sampling_limitation[subCluster_label[row]]
# check whether new node can be created or not
if remaining_limitation > 0:
sampling_limitation[subCluster_label[row]] = remaining_limitation - 1
surface_point = X_pos[points_pos[row, col]]
self.truncation_factor = 1.0
else:
proceed = False
row = self.random_state_.randint(low=0, high=len(X_pos), size=1)[0]
# Combined strategy
else:
surface_point_pos = X_pos[points_pos[row, col]]
surface_point_neg = X_neg[points_neg[row, 0]]
radius_pos = norm(center - surface_point_pos)
radius_neg = norm(center - surface_point_neg)
surface_point = (
surface_point_neg if radius_pos > radius_neg else surface_point_pos
)
if radius_pos > radius_neg:
self.truncation_factor = -1.0
else:
self.truncation_factor = 0.4
# Append new sample
if proceed:
X_new[ind] = _make_geometric_sample(
center,
surface_point,
self.truncation_factor,
self.deformation_factor,
self.random_state_,
)
created = True
else:
row = self.random_state_.randint(low=0, high=len(X_pos), size=1)[0]
# Create new samples for target variable
y_new = np.array([pos_class_label] * len(samples_indices))
return X_new, y_new
# Handles sub clustering to create synthetic data
def sub_clustering(self, X_pos, n_samples):
# Find efficient number of clusters
model = KMeans()
visualizer = KElbowVisualizer(model, k=(3, 20))
visualizer.fit(X_pos) # Fit the data to the visualizer
num_clusters = visualizer.elbow_value_
# synthetic samples created under minority strategy
num_samples = n_samples * 0.4
kmeans = KMeans(n_clusters=num_clusters)
kmeans.fit(X_pos)
y_kmeans = kmeans.predict(X_pos)
clusters = []
range_oversample = []
# Iterate through sub-clusters
for i in range(num_clusters):
cluster = X_pos[y_kmeans == i]
clusters.append(cluster)
range_oversample.append(int(len(cluster) * (num_samples / len(X_pos))))
return y_kmeans, range_oversample
def _fit_resample(self, X, y):
# Validate estimator's parameters
self._validate_estimator()
# Copy data
X_resampled, y_resampled = X.copy(), y.copy()
# Resample data
for class_label, n_samples in self.min_oversample(y).items():
if n_samples < 0:
n_samples = 0
# Apply gsmote mechanism
X_new, y_new = self._make_geometric_samples(X, y, class_label, n_samples)
# Append new data
X_resampled, y_resampled = (
np.vstack((X_resampled, X_new)),
np.hstack((y_resampled, y_new)),
)
return X_resampled, y_resampled
# find number of minority samples to generate
def min_oversample(self, y):
target_stats = _count_class_sample(y)
n_sample_majority = max(target_stats.values())
class_minority = min(target_stats, key=target_stats.get)
sampling_strategy = {
key: int(n_sample_majority * (self.sampling_rate / (1 - self.sampling_rate))) - value
for (key, value) in target_stats.items()
if key == class_minority
}
sampling_strategy = collections.OrderedDict(sampling_strategy)
return sampling_strategy