-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathieseg.py
111 lines (74 loc) · 4.58 KB
/
ieseg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from pandas import DataFrame
from pandas import Series
from pandas import read_csv
from numpy import array
from numpy import random
from sklearn.metrics import roc_curve
def roc (dataSet: DataFrame, actuals: str, probability: str) -> DataFrame:
(fpr,tpr,threshold) = roc_curve(array(dataSet[actuals]), array(dataSet[probability]), pos_label = 1)
returnData = DataFrame(tpr)
returnData.columns = ["True positive rate"]
returnData["False positive rate"] = DataFrame(fpr)
return returnData
def lift (dataSet: DataFrame, actuals: str, probability: str, precision: int = 20) -> DataFrame:
summary = cumulativeResponse(dataSet = dataSet, actuals = actuals, probability = probability, precision = precision)
summary["Lift"] = summary["Cumulative response"] / Series(summary["Average response"]).max()
summary["Base"] = summary["Average response"] / Series(summary["Average response"]).max()
return summary[["Quantile","Lift","Base"]]
def cumulativeResponse (dataSet: DataFrame, actuals: str, probability: str, precision: int = 20) -> DataFrame:
internalSet = equifrequentBinning (dataSet = dataSet[[actuals, probability]], byColumn = probability, into = precision)
internalSet["Quantile"] = internalSet[probability + "_bin"] / precision
internalSet["obs"] = 1
summary = internalSet[["Quantile", actuals, "obs"]].groupby(["Quantile"], as_index = False).sum().sort_values(by = "Quantile", ascending = False)
summary["cumulativeTarget"] = Series(summary[actuals]).cumsum(skipna = False)
summary["cumulativeAll"] = Series(summary["obs"]).cumsum(skipna = False)
summary["Cumulative response"] = summary["cumulativeTarget"] / summary["cumulativeAll"]
summary["Average response"] = Series(summary["cumulativeTarget"]).max() / Series(summary["cumulativeAll"]).max()
return summary[["Quantile","Cumulative response","Average response"]]
def cumulativeGains (dataSet: DataFrame, actuals: str, probability: str, precision: int = 20) -> DataFrame:
internalSet = equifrequentBinning (dataSet = dataSet[[actuals, probability]], byColumn = probability, into = precision)
internalSet["Quantile"] = internalSet[probability + "_bin"] / precision
internalSet["obs"] = 1
summary = internalSet[["Quantile", actuals, "obs"]].groupby(["Quantile"], as_index = False).sum().sort_values(by = "Quantile", ascending = False)
summary["cumulativeTarget"] = Series(summary[actuals]).cumsum(skipna = False)
summary["cumulativeAll"] = Series(summary["obs"]).cumsum(skipna = False)
summary["Cumulative gains"] = summary["cumulativeTarget"] / Series(summary["cumulativeTarget"]).max()
summary["Base"] = summary["Quantile"]
return summary[["Quantile","Cumulative gains","Base"]]
def equifrequentBinning (dataSet: DataFrame, byColumn: str, into: int) -> DataFrame:
internalSet = dataSet
quanitles = []
for i in range(into):
quanitles.append(1 / into * (i))
quantile = internalSet.quantile(quanitles, axis = 0)[byColumn].to_dict()
internalSet["Bin"] = 0
for q in quantile:
upperBound = quantile[q]
internalSet.loc[internalSet[byColumn] >= upperBound, byColumn + "_bin"] = int(q * into +1)
return internalSet
def partition (dataFrame : DataFrame, splitStrategy: [float]) -> [DataFrame]:
def assignPartition (toDataFrame: DataFrame, lowerBound: float, upperBound: float, index: int) -> int:
if toDataFrame["random"] >= lowerBound * observations and toDataFrame["random"] < upperBound * observations:
return index
else:
return int(toDataFrame["Split"])
if type(splitStrategy) != list:
raise KeyError("Split strategy must be an array of floating point values.")
elif sum(splitStrategy) != 1:
raise ValueError("Split strategy must sum to 1.")
else:
observations = dataFrame.shape[0]
partitions = len(splitStrategy)
cumulativeSplit = 0
data = dataFrame.copy()
data["random"] = random.permutation(observations)
data["Split"] = 0
for index, split in enumerate(splitStrategy):
lowerSplit = cumulativeSplit
upperSplit = cumulativeSplit + split + 1
cumulativeSplit += split
data["Split"] = data.apply(lambda x: assignPartition(x,lowerSplit,upperSplit,index+1), axis = 1)
partitions = []
for i in range(len(splitStrategy)):
partitions.append(data.loc[data["Split"] == i+1].drop(["Split","random"], axis = 1).reset_index(drop = True))
return partitions