-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path__init__.py
231 lines (177 loc) · 8.47 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import os
import pytest
import pathlib
import numpy as np
import pandas as pd
from munch import DefaultMunch
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
from virny.datasets.base import BaseDataLoader
def get_root_dir():
# Configure a location of root dir
cur_folder_name = os.getcwd().split('/')[-1]
if cur_folder_name == "tests":
root_dir = os.path.join('..')
else:
# For the root repo path
if os.path.exists(os.path.join('virny', 'datasets')):
root_dir = os.getcwd()
else:
root_dir = os.path.join('..', '..')
return root_dir
def compare_metric_dfs_v2(df1, df2):
# Check shape
if not df1.shape == df2.shape:
return False
# Check column names
if not sorted(df1.columns.tolist()) == sorted(df2.columns.tolist()):
return False
# Check values
df1 = df1.sort_values(by='Metric').reset_index(drop=True)
df2 = df2.sort_values(by='Metric').reset_index(drop=True)
if not df1.equals(df2):
return False
return True
def compare_metric_dfs_with_tolerance(df1, df2, tolerance=1e-6):
# Check shape
if not df1.shape == df2.shape:
return False
# Check column names
if not sorted(df1.columns.tolist()) == sorted(df2.columns.tolist()):
return False
# Check values
df1 = df1.sort_values(by='Metric').reset_index(drop=True)
df2 = df2.sort_values(by='Metric').reset_index(drop=True)
categorical_cols = ['Metric', 'Model_Name', 'Model_Params']
numerical_cols = [col for col in df1.columns if col not in categorical_cols]
# Compare numerical columns with tolerance
close_numerical = np.isclose(df1[numerical_cols], df2[numerical_cols], atol=tolerance).all(axis=0)
# Compare categorical columns directly
equal_categorical = (df1[categorical_cols] == df2[categorical_cols]).all()
# Combine both results
overall_equal = close_numerical.all() and equal_categorical.all()
if not overall_equal:
return False
return True
def compare_metric_dfs(expected_composed_metrics_df, actual_composed_metrics_df,
model_name, metrics_lst, groups, alpha=0.000_001):
for metric_name in metrics_lst:
for group in groups:
expected_metric_val = expected_composed_metrics_df[
(expected_composed_metrics_df['Model_Name'] == model_name) &
(expected_composed_metrics_df['Metric'] == metric_name)
][group].values[0]
actual_metric_val = actual_composed_metrics_df[
(actual_composed_metrics_df['Model_Name'] == model_name) &
(actual_composed_metrics_df['Metric'] == metric_name)
][group].values[0]
assert abs(expected_metric_val - actual_metric_val) < alpha, f"Assert for {metric_name} metric and {group} group"
ROOT_DIR = get_root_dir()
@pytest.fixture(scope='package')
def config_params():
config_dct = {
"dataset_name": 'COMPAS',
"test_set_fraction": 0.2,
"bootstrap_fraction": 0.8,
"n_estimators": 100,
"runs_seed_lst": [100, 200, 300, 400, 500, 600],
"sensitive_attributes_dct": {'sex': 1, 'race': 'African-American', 'sex&race': None},
}
return DefaultMunch.fromDict(config_dct)
@pytest.fixture(scope='package')
def folk_emp_config_params():
config_dct = {
"dataset_name": 'Folktables_NY_2018_Employment',
"test_set_fraction": 0.2,
"bootstrap_fraction": 0.8,
"n_estimators": 100,
"num_runs": 1,
"runs_seed_lst": [100],
"sensitive_attributes_dct": {'SEX': '2', 'RAC1P': '2', 'SEX & RAC1P': None},
}
return DefaultMunch.fromDict(config_dct)
@pytest.fixture(scope='package')
def models_config():
return {
'DecisionTreeClassifier': DecisionTreeClassifier(criterion='gini',
max_depth=20,
max_features=0.6,
min_samples_split=0.1),
'LogisticRegression': LogisticRegression(C=1,
max_iter=50,
penalty='l2',
solver='newton-cg'),
'XGBClassifier': XGBClassifier(learning_rate=0.1,
n_estimators=200,
max_depth=7),
}
@pytest.fixture(scope='package')
def folk_employment_NY_2018_loader():
df_path = pathlib.Path(__file__).parent.joinpath('files_for_tests').joinpath('folk_employment_NY_2018.csv')
full_df = pd.read_csv(df_path, header=0)
target = 'ESR'
numerical_columns = ['AGEP']
categorical_columns = ['MAR', 'MIL', 'ESP', 'MIG', 'DREM', 'NATIVITY', 'DIS', 'DEAR', 'DEYE', 'SEX', 'RAC1P',
'RELP', 'CIT', 'ANC', 'SCHL']
full_df[categorical_columns] = full_df[categorical_columns].astype('str')
return BaseDataLoader(full_df=full_df,
target=target,
numerical_columns=numerical_columns,
categorical_columns=categorical_columns)
@pytest.fixture(scope='package')
def compas_dataset_class():
dataset_path = os.path.join(ROOT_DIR, 'virny', 'datasets', 'data', 'COMPAS.csv')
df = pd.read_csv(dataset_path)
int_columns = ['recidivism', 'age', 'age_cat_25 - 45', 'age_cat_Greater than 45',
'age_cat_Less than 25', 'c_charge_degree_F', 'c_charge_degree_M', 'sex']
int_columns_dct = {col: "int" for col in int_columns}
df = df.astype(int_columns_dct)
target = 'recidivism'
numerical_columns = ['age', 'juv_fel_count', 'juv_misd_count', 'juv_other_count', 'priors_count']
categorical_columns = ['race', 'age_cat_25 - 45', 'age_cat_Greater than 45',
'age_cat_Less than 25', 'c_charge_degree_F', 'c_charge_degree_M', 'sex']
return BaseDataLoader(full_df=df,
target=target,
numerical_columns=numerical_columns,
categorical_columns=categorical_columns)
@pytest.fixture(scope='package')
def compas_without_sensitive_attrs_dataset_class():
dataset_path = os.path.join(ROOT_DIR, 'virny', 'datasets', 'data', 'COMPAS.csv')
df = pd.read_csv(dataset_path)
int_columns = ['recidivism', 'age', 'age_cat_25 - 45', 'age_cat_Greater than 45',
'age_cat_Less than 25', 'c_charge_degree_F', 'c_charge_degree_M', 'sex']
int_columns_dct = {col: "int" for col in int_columns}
df = df.astype(int_columns_dct)
target = 'recidivism'
numerical_columns = ['juv_fel_count', 'juv_misd_count', 'juv_other_count', 'priors_count']
categorical_columns = ['age_cat_25 - 45', 'age_cat_Greater than 45',
'age_cat_Less than 25', 'c_charge_degree_F', 'c_charge_degree_M']
return BaseDataLoader(full_df=df,
target=target,
numerical_columns=numerical_columns,
categorical_columns=categorical_columns)
@pytest.fixture(scope='package')
def COMPAS_y_test():
y_test = pd.read_csv(os.path.join(ROOT_DIR, 'tests', 'files_for_tests', 'COMPAS_use_case', 'COMPAS_y_test.csv'), header=0)
y_test = y_test.set_index("0")
return y_test
@pytest.fixture(scope='package')
def COMPAS_RF_expected_preds():
expected_preds = pd.read_csv(os.path.join(ROOT_DIR, 'tests', 'files_for_tests', 'COMPAS_use_case',
'COMPAS_RF_expected_preds.csv'), header=0)
expected_preds = expected_preds.set_index("0")
return expected_preds
@pytest.fixture(scope='package')
def COMPAS_RF_bootstrap_predictions():
models_predictions = pd.read_csv(os.path.join(ROOT_DIR, 'tests', 'files_for_tests', 'COMPAS_use_case',
'COMPAS_RF_predictions.csv'), header=0)
models_predictions = models_predictions.reset_index(drop=True)
models_predictions_dct = dict()
for col in models_predictions.columns:
models_predictions_dct[int(col)] = models_predictions[col].to_numpy()
return models_predictions_dct
@pytest.fixture(scope='package')
def COMPAS_RF_expected_metrics():
return pd.read_csv(os.path.join(ROOT_DIR, 'tests', 'files_for_tests', 'COMPAS_use_case',
'COMPAS_RF_expected_metrics.csv'), header=0)