-
Notifications
You must be signed in to change notification settings - Fork 0
/
fdt.py
490 lines (430 loc) · 16.1 KB
/
fdt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# Federated Distribution Tree
# Dependencies
import pandas as pd
import numpy as np
from fbd import FBD
from flr import FLR
class FDTNode:
"""
Class for hosting the branches for a federated distribution tree
"""
def __init__(
self,
depth: int,
features: list,
n: int,
node_type: str,
rule: str,
algo_type: str = "classification", # classificatin | regression | bayes | linear
counts: dict = None,
yhat: float = None,
ystd: float = None,
fbd: FBD = None,
flr: FLR = None
):
self.depth = depth
self.features = features
self.n = n
self.node_type = node_type
self.algo_type = algo_type
self.rule = rule
self.counts = counts
self.yhat = yhat
self.ystd = ystd
self.fbd = fbd
self.flr = flr
self.branches = None
@staticmethod
def Counter(arr):
c = np.unique(arr, return_counts=True)
return {c[0][i]:c[1][i] for i in range(len(c[0]))}
@staticmethod
def Counter_dominatant(counts: dict[int, int]) -> int:
"""
Sorting the counts and saving the final prediction of the node
"""
if counts:
return max(counts, key=counts.get)
return None
def print_info(self, width = 4):
"""
Method to print the infromation about the tree
"""
# Defining the number of spaces
const = int(self.depth * width ** 1.5)
spaces = "-" * const
if self.node_type == 'root':
print("Root")
else:
print(f"|{spaces} Split rule: {self.rule}")
print(f"{' ' * const} | Class distribution in the node: {dict(self.counts)}")
print(f"{' ' * const} | Predicted class: {self.yhat}")
def print_tree(self):
"""
Prints the whole tree from the current node to the bottom
"""
self.print_info()
if self.branches is not None:
for branch in self.branches:
branch.left.print_tree()
branch.right.print_tree()
def get_stats(self):
"""
Return statistics of the federated distribution tree
"""
if self.branches is not None:
count = 0
for branch in self.branches:
stats_left = branch.left.get_stats()
stats_right = branch.right.get_stats()
count += stats_left["count"]
count += stats_right["count"]
return {"count": count}
else:
return {"count": 1}
def predict(self, X: pd.DataFrame):
"""
Batch prediction method
"""
predictions = []
for _, x in X.iterrows():
if self.algo_type == "bayes":
pred = self.predict_bayes(x)
predictions.append(max(pred, key=pred.get))
elif self.algo_type == "linear":
predictions.append(self.predict_linear(x))
elif self.algo_type == "regression":
predictions.append(self.predict_linear(x))
elif self.algo_type == "classification":
predictions.append(self.predict_obs(x))
return predictions
def predict_obs(self, values: pd.Series):
"""
Method to predict the class given a set of features
"""
if self.branches:
node_map = {}
for branch in self.branches:
if (values[branch.feature] < branch.value):
y_pred = branch.left.predict_obs(values)
else:
y_pred = branch.right.predict_obs(values)
if y_pred in node_map:
node_map[y_pred] += branch.n
else:
node_map[y_pred] = branch.n
return self.Counter_dominatant(node_map)
return self.yhat
def predict_bayes(self, values: pd.Series) -> dict:
"""
Method to predict the class vs expectation given a set of features
"""
if self.branches:
node_map = {}
for branch in self.branches:
if (values[branch.feature] < branch.value):
pred = branch.left.predict_bayes(values)
else:
pred = branch.right.predict_bayes(values)
for k in pred:
if k in node_map:
node_map[k] += pred[k]
else:
node_map[k] = pred[k]
return node_map
if self.fbd:
return self.fbd.predict_one(values)
return {}
def predict_linear(self, values: pd.Series):
"""
Method to predict the class given a set of features
"""
if self.branches:
node_acc = 0.0
node_cnt = 0
for branch in self.branches:
if (values[branch.feature] < branch.value):
y_pred = branch.left.predict_linear(values)
else:
y_pred = branch.right.predict_linear(values)
node_acc += y_pred * branch.n
node_cnt += branch.n
return node_acc / node_cnt
if self.flr:
return self.flr.predict_one(values)
return self.yhat
def merge(self, tree):
"""
Merge another federated distribution tree to support federated learning
"""
if self.algo_type == "classification":
self.counts.update(tree.counts)
self.yhat = self.Counter_dominatant(self.counts)
elif self.algo_type == "regression":
self.yhat = (self.n * self.yhat + tree.n * tree.yhat) / (self.n + tree.n)
elif self.algo_type == "bayes":
self.fbd.merge(tree.fbd)
elif self.algo_type == "linear":
self.flr.merge(tree.flr)
if self.branches and tree.branches:
for t in tree.branches:
no_match = True
for s in self.branches:
if s.feature == t.feature and s.direct == t.direct:
no_match = False
s.left.merge(t.left)
s.right.merge(t.right)
new_count = s.n + t.n
s.value = (s.n * s.value + t.n * t.value) / new_count
s.n = new_count
if no_match:
self.branches.append(t)
elif tree.branches:
self.branches = tree.branches
class FDTBranch:
"""
Class for creating the decision for a federated distribution tree
"""
def __init__(
self,
n: int,
feature: str,
value: float,
left: FDTNode,
right: FDTNode,
direct: bool
):
self.n = n
self.feature = feature
self.value = value
self.left = left
self.right = right
self.direct = direct
class FDT:
"""
Class for creating the tree for a federated distribution tree
"""
def __init__(
self,
min_samples_split = None,
min_samples_leaf = None,
max_depth = None,
depth = None,
node_type = None,
rule = None,
algo_type: str = "classification"
):
# Saving the hyper parameters
self.min_samples_split = min_samples_split if min_samples_split else 20
self.min_samples_leaf = min_samples_leaf if min_samples_leaf else 10
self.max_depth = max_depth if max_depth else 5
# Default current depth of node
self.depth = depth if depth else 0
# Type of node
self.node_type = node_type if node_type else 'root'
# Rule for spliting (previous stage split)
self.rule = rule if rule else ""
# Algorithm with classification and regression
self.algo_type = algo_type
# Initiating the return tree
self.tree = None
@staticmethod
def ma(x: np.array) -> np.array:
"""
Calculates the moving average of the given list.
"""
return (x[1:] + x[:-1]) / 2
@staticmethod
def GINI_impurity(counts: dict[int, int]) -> float:
"""
Given the observations of a multi class calculate the GINI impurity
"""
# multi class calculation from Counter
multi_count = counts.values()
n = sum(multi_count)
gini = 1 - sum([(i / n) ** 2 for i in multi_count])
# Returning the gini impurity
return gini
@staticmethod
def MSE_loss(ytrue, yhat):
"""
Return the mse comparing Y to the prediction
"""
# Getting the residuals
r = np.sum((ytrue - yhat) ** 2)
# Getting the average and returning
n = len(ytrue)
if n == 0:
return float('inf')
return r / n
@staticmethod
def FBD_loss(X, Y):
tfbd = FBD().fit(X, Y)
return FDT.GINI_impurity(FDTNode.Counter(tfbd.predict(X) == np.array(Y)))
@staticmethod
def FLR_loss(X, Y):
tflr = FLR().fit(X, Y)
return FDT.MSE_loss(Y, tflr.predict(X))
def get_Loss(self, X, Y):
# GINI impurity
if self.algo_type == "classification":
return self.GINI_impurity(self.counts)
elif self.algo_type == "regression":
return self.MSE_loss(Y, self.yhat)
elif self.algo_type == "bayes":
return self.GINI_impurity(FDTNode.Counter(self.fbd.predict(X) == np.array(Y)))
elif self.algo_type == "linear":
return self.MSE_loss(Y, self.flr.predict(X))
return 0.0
def best_split(self, X, Y) -> tuple:
"""
Given the X features and Y targets calculates the best split
"""
# Creating a dataset for spliting
df = X.copy()
df['Y'] = Y
# Getting the loss for the base input
loss_base = self.get_Loss(X, Y)
# Finding which split yields the best GINI gain
max_gain = 0
# Default best feature and split
best_feature = None
best_value = None
best_direction = None
for feature in self.features:
# Sorting the values and getting the rolling average
xmeans = self.ma(np.sort(df[feature].unique()))
for value in xmeans[self.min_samples_leaf - 1: len(xmeans) - self.min_samples_leaf + 1]:
# Spliting the dataset
left_df = df[df[feature] < value]
left_y = left_df['Y']
right_df = df[df[feature] >= value]
right_y = right_df['Y']
# Getting the obs count from the left and the right data splits
n_left = len(left_y)
n_right = len(right_y)
if self.algo_type == "classification":
# Getting the left and right gini impurities
loss_left = self.GINI_impurity(FDTNode.Counter(left_y))
loss_right = self.GINI_impurity(FDTNode.Counter(right_y))
elif self.algo_type == "regression":
# Getting the left and right mse loss
loss_left = self.MSE_loss(left_y, np.mean(left_y))
loss_right = self.MSE_loss(right_y, np.mean(right_y))
elif self.algo_type == "bayes":
left_x = left_df[self.features]
right_x = right_df[self.features]
loss_left = self.FBD_loss(left_x, left_y)
loss_right = self.FBD_loss(right_x, right_y)
elif self.algo_type == "linear":
left_x = left_df[self.features]
right_x = right_df[self.features]
loss_left = self.FLR_loss(left_x, left_y)
loss_right = self.FLR_loss(right_x, right_y)
# Calculating the weighted loss
wloss_left = n_left / (n_left + n_right) * loss_left
wloss_right = n_right / (n_left + n_right) * loss_right
wloss = wloss_left + wloss_right
# Calculating the gain
gain = loss_base - wloss
# Checking if this is the best split so far
if gain > max_gain:
best_feature = feature
best_value = value
best_direction = bool(wloss_left > wloss_right)
# Setting the best gain to the current one
max_gain = gain
return (best_feature, best_value, best_direction)
def fit(
self,
X: pd.DataFrame,
Y: list
) -> FDTNode:
"""
Recursive method to create the federated distribution tree
"""
# Extracting all the features
self.features = list(X.columns)
# Init None variables
self.counts = None
self.yhat = None
self.ystd = None
self.fbd = None
self.flr = None
# Calculating the counts of Y in the node
if self.algo_type == "classification":
self.counts = FDTNode.Counter(Y)
self.yhat = FDTNode.Counter_dominatant(self.counts)
# Federated bayes decision
if self.algo_type == "bayes":
self.fbd = FBD().fit(X, Y)
# Regression
if self.algo_type == "regression":
self.yhat = np.mean(Y)
self.ystd = np.std(Y)
# Federated linear regression
if self.algo_type == "linear":
self.flr = FLR().fit(X, Y)
self.yhat = np.mean(Y)
# Saving the number of observations in the node
self.n = len(Y)
# Making a df from the data
df = X.copy()
df['Y'] = Y
self.tree = FDTNode(
depth = self.depth,
features = self.features,
n = self.n,
node_type = self.node_type,
rule = self.rule,
algo_type = self.algo_type,
counts = self.counts,
yhat = self.yhat,
ystd = self.ystd,
fbd = self.fbd,
flr = self.flr
)
# If there is GINI to be gained, we split further
if (self.depth < self.max_depth) and (self.n >= self.min_samples_split):
# Getting the best split
best_feature, best_value, best_direction = self.best_split(X, Y)
if best_feature is not None:
# Getting the left and right nodes
left_df, right_df = df[df[best_feature]<=best_value].copy(), df[df[best_feature]>best_value].copy()
left_type = ["sub_node", "main_node"][best_direction]
right_type = ["main_node", "sub_node"][best_direction]
# Creating the left and right nodes
left = FDT(
depth = self.depth + 1,
max_depth = self.max_depth,
min_samples_split = self.min_samples_split,
min_samples_leaf = self.min_samples_leaf,
node_type = left_type,
rule = f"{best_feature} <= {round(best_value, 3)}",
algo_type = self.algo_type
).fit(
left_df[self.features],
left_df['Y'].values.tolist()
)
right = FDT(
depth = self.depth + 1,
max_depth = self.max_depth,
min_samples_split = self.min_samples_split,
min_samples_leaf = self.min_samples_leaf,
node_type = right_type,
rule = f"{best_feature} > {round(best_value, 3)}",
algo_type = self.algo_type
).fit(
right_df[self.features],
right_df['Y'].values.tolist()
)
# Create the FDT branch
self.tree.branches = [FDTBranch(
n = 1,
feature = best_feature,
value = best_value,
left = left,
right = right,
direct = best_direction
)]
return self.tree