From be59851ef1a546db6cc04872f92ab74b90b6f10b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Quintas-Mart=C3=ADnez?= Date: Tue, 14 May 2024 14:59:02 -0400 Subject: [PATCH] Run isort and black --- dowhy/gcm/distribution_change_robust.py | 329 +++++++++++-------- tests/gcm/test_distribution_change_robust.py | 70 ++-- 2 files changed, 226 insertions(+), 173 deletions(-) diff --git a/dowhy/gcm/distribution_change_robust.py b/dowhy/gcm/distribution_change_robust.py index 4bd04c59a1..b090fad295 100644 --- a/dowhy/gcm/distribution_change_robust.py +++ b/dowhy/gcm/distribution_change_robust.py @@ -8,16 +8,20 @@ import warnings from itertools import groupby -import numpy as np, pandas as pd, networkx as nx +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +import networkx as nx +import numpy as np +import pandas as pd from sklearn.base import is_classifier, is_regressor -from sklearn.model_selection import KFold from sklearn.linear_model import LinearRegression, LogisticRegression +from sklearn.model_selection import KFold, train_test_split from statsmodels.stats.weightstats import DescrStatsW + from dowhy.gcm.causal_models import ProbabilisticCausalModel -from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from dowhy.graph import DirectedGraph, node_connected_subgraph_view from dowhy.gcm.shapley import ShapleyConfig, estimate_shapley_values -from sklearn.model_selection import train_test_split +from dowhy.graph import DirectedGraph, node_connected_subgraph_view + class ThetaC: """Implements three estimators (regression, re-weighting, MR) for causal change attribution.""" @@ -37,7 +41,7 @@ def __init__(self, C, h_fn=lambda y: y, warn_th=1e-3): self.reg_dict = {} # A dictionary to store the trained regressors self.cla_dict = {} # A dictionary to store the trained classifiers self.calib_dict = {} # A dictionary to store the trained calibrators - self.alpha_dict = {} # A dictionary to store the fitted weights alpha_k (Theorem 2.4) + self.alpha_dict = {} # A dictionary to store the fitted weights alpha_k (Theorem 2.4) self.warn_th = warn_th # The threshold that generates warning about reweighting def _simplify_C(self, all_indep=False): @@ -65,11 +69,9 @@ def _simplify_C(self, all_indep=False): # Otherwise, we just group the consecutive values (Remark C.1). else: - self.C_simpl = [ - (c, list(inds)) for c, inds in groupby(range(len(self.C)), lambda i: self.C[i]) - ] + self.C_simpl = [(c, list(inds)) for c, inds in groupby(range(len(self.C)), lambda i: self.C[i])] - self.K_simpl = len(self.C_simpl)-1 + self.K_simpl = len(self.C_simpl) - 1 def _train_reg( self, @@ -103,28 +105,29 @@ def _train_reg( # Train gamma_K: ind = T_train == self.C_simpl[-1][0] # Select sample C_{K+1} \in {0,1} var = [a for b in self.C_simpl[:-1] for a in b[1]] # Select right variables - self.reg_dict[self.K_simpl-1] = regressor(*regressor_args, **regressor_kwargs) + self.reg_dict[self.K_simpl - 1] = regressor(*regressor_args, **regressor_kwargs) if w_train is not None: - self.reg_dict[self.K_simpl-1].fit( - X_train[np.ix_(ind, var)], self.h_fn(y_train[ind]), sample_weight = w_train[ind], - **regressor_fit_kwargs) + self.reg_dict[self.K_simpl - 1].fit( + X_train[np.ix_(ind, var)], self.h_fn(y_train[ind]), sample_weight=w_train[ind], **regressor_fit_kwargs + ) else: - self.reg_dict[self.K_simpl-1].fit( - X_train[np.ix_(ind, var)], self.h_fn(y_train[ind]), **regressor_fit_kwargs) + self.reg_dict[self.K_simpl - 1].fit( + X_train[np.ix_(ind, var)], self.h_fn(y_train[ind]), **regressor_fit_kwargs + ) # Train gamma_k for k = K-1, K-2, ..., 1: - for k in range(2, self.K_simpl+1): + for k in range(2, self.K_simpl + 1): ind = T_train == self.C_simpl[-k][0] # Select sample C_{k+1} \in {0,1} var_new = [a for b in self.C_simpl[:-k] for a in b[1]] # Select right variables # Use the fitted values from previous regression - new_y = self.reg_dict[self.K_simpl-k+1].predict(X_train[np.ix_(ind, var)]) - self.reg_dict[self.K_simpl-k] = regressor(*regressor_args, **regressor_kwargs) + new_y = self.reg_dict[self.K_simpl - k + 1].predict(X_train[np.ix_(ind, var)]) + self.reg_dict[self.K_simpl - k] = regressor(*regressor_args, **regressor_kwargs) if w_train is not None: - self.reg_dict[self.K_simpl-k].fit(X_train[np.ix_(ind, var_new)], new_y, sample_weight = w_train[ind], - **regressor_fit_kwargs) + self.reg_dict[self.K_simpl - k].fit( + X_train[np.ix_(ind, var_new)], new_y, sample_weight=w_train[ind], **regressor_fit_kwargs + ) else: - self.reg_dict[self.K_simpl-k].fit(X_train[np.ix_(ind, var_new)], new_y, - **regressor_fit_kwargs) + self.reg_dict[self.K_simpl - k].fit(X_train[np.ix_(ind, var_new)], new_y, **regressor_fit_kwargs) var = var_new def _train_cla( @@ -180,27 +183,27 @@ def _train_cla( # Train classifiers that will go into alpha_k for k = 1, ..., K: for k in range(self.K_simpl): - var = [a for b in self.C_simpl[:(k+1)] for a in b[1]] # Select right variables + var = [a for b in self.C_simpl[: (k + 1)] for a in b[1]] # Select right variables self.cla_dict[k] = classifier(*classifier_args, **classifier_kwargs) if w_train is not None: - self.cla_dict[k].fit(X_train[:, var], T_train, sample_weight = w_train, **classifier_fit_kwargs) + self.cla_dict[k].fit(X_train[:, var], T_train, sample_weight=w_train, **classifier_fit_kwargs) else: self.cla_dict[k].fit(X_train[:, var], T_train, **classifier_fit_kwargs) # For the case where you want to calibrate on different data, # No need if classifier is CalibratedClassifierCv - if calibrator is not None: + if calibrator is not None: proba = self.cla_dict[k].predict_proba(X_calib[:, var])[:, [1]] self.calib_dict[k] = calibrator(*calibrator_args, **calibrator_kwargs) if w_train is not None: - self.calib_dict[k].fit(proba, T_calib, sample_weight = w_calib, **calibrator_fit_kwargs) + self.calib_dict[k].fit(proba, T_calib, sample_weight=w_calib, **calibrator_fit_kwargs) else: self.calib_dict[k].fit(proba, T_calib, **calibrator_fit_kwargs) var = [a for b in self.C_simpl[:-1] for a in b[1]] # Select right variables - p = self.cla_dict[self.K_simpl-1].predict_proba(X_eval[:, var])[:, 1] + p = self.cla_dict[self.K_simpl - 1].predict_proba(X_eval[:, var])[:, 1] - if np.min(p) < self.warn_th or np.max(p) > 1-self.warn_th: + if np.min(p) < self.warn_th or np.max(p) > 1 - self.warn_th: warnings.warn( f"min P(T = 1 | X) = {np.min(p) :.2f}, max P(T = 1 | X) = {np.max(p) :.2f}, indicating low overlap. \n" + "Consider increasing the regularization for the classificator or using method = 'regression'." @@ -209,7 +212,7 @@ def _train_cla( def _get_alphas(self, X_eval, T_eval, ratio, calibrator=None, crop=1e-3): """ This helper function uses the classifiers (and, if appropriate, the probability calibrators) - to compute the weights alpha_k (defined in Theorem 2.4 of the paper), + to compute the weights alpha_k (defined in Theorem 2.4 of the paper), which are then stored in self.alpha_dict. Inputs: @@ -230,48 +233,52 @@ def _get_alphas(self, X_eval, T_eval, ratio, calibrator=None, crop=1e-3): """ for k in range(self.K_simpl): - ind = T_eval == self.C_simpl[k+1][0] # Select sample C_{k+1} \in {0,1} + ind = T_eval == self.C_simpl[k + 1][0] # Select sample C_{k+1} \in {0,1} # k = 0 doesn't have parents, get the marginal RN derivative. if k == 0: var = self.C_simpl[0][1] # Select right variables - p = np.minimum(np.maximum(self.cla_dict[0].predict_proba(X_eval[np.ix_(ind, var)])[:, 1], - crop), 1-crop) + p = np.minimum( + np.maximum(self.cla_dict[0].predict_proba(X_eval[np.ix_(ind, var)])[:, 1], crop), 1 - crop + ) if calibrator is not None and is_regressor(calibrator): - p = np.minimum(np.maximum(self.calib_dict[0].predict(p[:, np.newaxis]), crop), 1-crop) + p = np.minimum(np.maximum(self.calib_dict[0].predict(p[:, np.newaxis]), crop), 1 - crop) elif calibrator is not None and is_classifier(calibrator): - p = np.minimum(np.maximum(self.calib_dict[0].predict_proba(p[:, np.newaxis])[:, 1], - crop), 1-crop) - w = (1.0-p)/p * ratio if self.C_simpl[k+1][0] else p/(1.0-p) * 1/ratio + p = np.minimum(np.maximum(self.calib_dict[0].predict_proba(p[:, np.newaxis])[:, 1], crop), 1 - crop) + w = (1.0 - p) / p * ratio if self.C_simpl[k + 1][0] else p / (1.0 - p) * 1 / ratio # For k > 0 get the conditional RN derivative dividing the RN derivative for \bar{X}_j # by the RN derivative for \bar{X}_{j-1}. else: - var_joint = [a for b in self.C_simpl[:(k+1)] for a in b[1]] # Variables up to k - p_joint = np.minimum(np.maximum(self.cla_dict[k].predict_proba(X_eval[np.ix_(ind, var_joint)])[:, 1], - crop), 1-crop) + var_joint = [a for b in self.C_simpl[: (k + 1)] for a in b[1]] # Variables up to k + p_joint = np.minimum( + np.maximum(self.cla_dict[k].predict_proba(X_eval[np.ix_(ind, var_joint)])[:, 1], crop), 1 - crop + ) if calibrator is not None and is_regressor(calibrator): - p_joint = np.minimum(np.maximum(self.calib_dict[k].predict(p_joint[:, np.newaxis]), - crop), 1-crop) + p_joint = np.minimum(np.maximum(self.calib_dict[k].predict(p_joint[:, np.newaxis]), crop), 1 - crop) elif calibrator is not None and is_classifier(calibrator): - p_joint = np.minimum(np.maximum(self.calib_dict[k].predict_proba(p_joint[:, np.newaxis])[:, 1], - crop), 1-crop) - w_joint = (1-p_joint)/p_joint if self.C_simpl[k+1][0] else p_joint/(1-p_joint) + p_joint = np.minimum( + np.maximum(self.calib_dict[k].predict_proba(p_joint[:, np.newaxis])[:, 1], crop), 1 - crop + ) + w_joint = (1 - p_joint) / p_joint if self.C_simpl[k + 1][0] else p_joint / (1 - p_joint) var_cond = [a for b in self.C_simpl[:k] for a in b[1]] # Variables up to k-1 - p_cond = np.minimum(np.maximum(self.cla_dict[k-1].predict_proba(X_eval[np.ix_(ind, var_cond)])[:, 1], - crop), 1-crop) + p_cond = np.minimum( + np.maximum(self.cla_dict[k - 1].predict_proba(X_eval[np.ix_(ind, var_cond)])[:, 1], crop), 1 - crop + ) if calibrator is not None and is_regressor(calibrator): - p_cond = np.minimum(np.maximum(self.calib_dict[k-1].predict(p_cond[:, np.newaxis]), - crop), 1-crop) + p_cond = np.minimum( + np.maximum(self.calib_dict[k - 1].predict(p_cond[:, np.newaxis]), crop), 1 - crop + ) if calibrator is not None and is_classifier(calibrator): - p = np.minimum(np.maximum(self.calib_dict[k-1].predict_proba(p_cond[:, np.newaxis])[:, 1], - crop), 1-crop) - w_cond = p_cond/(1-p_cond) if self.C_simpl[k+1][0] else (1-p_cond)/p_cond + p = np.minimum( + np.maximum(self.calib_dict[k - 1].predict_proba(p_cond[:, np.newaxis])[:, 1], crop), 1 - crop + ) + w_cond = p_cond / (1 - p_cond) if self.C_simpl[k + 1][0] else (1 - p_cond) / p_cond w = w_joint * w_cond - self.alpha_dict[k] = w * self.alpha_dict[k-2] if k-2 in self.alpha_dict.keys() else w + self.alpha_dict[k] = w * self.alpha_dict[k - 2] if k - 2 in self.alpha_dict.keys() else w # alpha_k should integrate to 1. In small samples this might not be the case, so we standardize: self.alpha_dict[k] /= np.mean(self.alpha_dict[k]) @@ -351,7 +358,7 @@ def est_scores( Returns: theta_scores = (n_eval,) np.array of scores, such that theta_hat = np.mean(theta_scores). """ - + regressor_kwargs = {} if regressor_kwargs is None else regressor_kwargs regressor_fit_kwargs = {} if regressor_fit_kwargs is None else regressor_fit_kwargs classifier_kwargs = {} if classifier_kwargs is None else classifier_kwargs @@ -360,11 +367,11 @@ def est_scores( calibrator_fit_kwargs = {} if calibrator_fit_kwargs is None else calibrator_fit_kwargs if w_eval is None: - n0, n1, n = np.sum(1-T_eval), np.sum(T_eval), T_eval.shape[0] + n0, n1, n = np.sum(1 - T_eval), np.sum(T_eval), T_eval.shape[0] else: - n0, n1, n = np.sum(w_eval*(1-T_eval)), np.sum(w_eval*T_eval), np.sum(w_eval) + n0, n1, n = np.sum(w_eval * (1 - T_eval)), np.sum(w_eval * T_eval), np.sum(w_eval) - if len(self.C) != X_train.shape[1]+1: + if len(self.C) != X_train.shape[1] + 1: raise ValueError(f"len(C) must be K+1={X_train.shape[1]+1}, not {len(self.C)}") self._simplify_C(all_indep=all_indep) @@ -387,15 +394,15 @@ def est_scores( if self.C_simpl[0][0] == 1: theta_scores = np.concatenate( ( - np.zeros_like(y_eval[T_eval==0]), - self.reg_dict[0].predict(X_eval[np.ix_(ind, var)])*n/n1, + np.zeros_like(y_eval[T_eval == 0]), + self.reg_dict[0].predict(X_eval[np.ix_(ind, var)]) * n / n1, ) ) else: theta_scores = np.concatenate( ( - self.reg_dict[0].predict(X_eval[np.ix_(ind, var)])*n/n0, - np.zeros_like(y_eval[T_eval==1]), + self.reg_dict[0].predict(X_eval[np.ix_(ind, var)]) * n / n0, + np.zeros_like(y_eval[T_eval == 1]), ) ) @@ -418,34 +425,34 @@ def est_scores( calibrator_fit_kwargs=calibrator_fit_kwargs, ) - if 'class_weight' not in classifier_kwargs: - ratio = n1/n0 - elif classifier_kwargs['class_weight'] == 'balanced': + if "class_weight" not in classifier_kwargs: + ratio = n1 / n0 + elif classifier_kwargs["class_weight"] == "balanced": ratio = 1 else: - ratio = n1/n0 * classifier_kwargs['class_weight'][0]/classifier_kwargs['class_weight'][1] - - self._get_alphas( - X_eval, - T_eval, - ratio, - calibrator=calibrator, - crop=crop - ) - + ratio = n1 / n0 * classifier_kwargs["class_weight"][0] / classifier_kwargs["class_weight"][1] + + self._get_alphas(X_eval, T_eval, ratio, calibrator=calibrator, crop=crop) + ind = T_eval == self.C_simpl[-1][0] # Select sample C_{K+1} \in {0,1} if self.C_simpl[-1][0] == 1: theta_scores = np.concatenate( - (np.zeros_like(y_eval[T_eval==0]), self.alpha_dict[self.K_simpl-1] * self.h_fn(y_eval[ind])*n/n1) + ( + np.zeros_like(y_eval[T_eval == 0]), + self.alpha_dict[self.K_simpl - 1] * self.h_fn(y_eval[ind]) * n / n1, + ) ) else: theta_scores = np.concatenate( - (self.alpha_dict[self.K_simpl-1] * self.h_fn(y_eval[ind])*n/n0, np.zeros_like(y_eval[T_eval==1])) + ( + self.alpha_dict[self.K_simpl - 1] * self.h_fn(y_eval[ind]) * n / n0, + np.zeros_like(y_eval[T_eval == 1]), + ) ) elif method == "MR": - theta_scores_0 = np.zeros_like(y_eval[T_eval==0]) - theta_scores_1 = np.zeros_like(y_eval[T_eval==1]) + theta_scores_0 = np.zeros_like(y_eval[T_eval == 0]) + theta_scores_1 = np.zeros_like(y_eval[T_eval == 1]) # Regression base estimate: self._train_reg( @@ -485,60 +492,54 @@ def est_scores( calibrator_fit_kwargs=calibrator_fit_kwargs, ) - if 'class_weight' not in classifier_kwargs: - ratio = n1/n0 - elif classifier_kwargs['class_weight'] == 'balanced': + if "class_weight" not in classifier_kwargs: + ratio = n1 / n0 + elif classifier_kwargs["class_weight"] == "balanced": ratio = 1 else: - ratio = n1/n0 * classifier_kwargs['class_weight'][1]/classifier_kwargs['class_weight'][0] - - self._get_alphas( - X_eval, - T_eval, - ratio, - calibrator=calibrator, - crop=crop - ) + ratio = n1 / n0 * classifier_kwargs["class_weight"][1] / classifier_kwargs["class_weight"][0] + + self._get_alphas(X_eval, T_eval, ratio, calibrator=calibrator, crop=crop) for k in range(self.K_simpl): - ind = T_eval == self.C_simpl[k+1][0] # Select sample C_{k+1} \in {0,1} - var = [a for b in self.C_simpl[:(k+1)] for a in b[1]] # Variables up to k - var_next = [a for b in self.C_simpl[:(k+2)] for a in b[1]] # Variables up to k+1 - if self.C_simpl[k+1][0] == 1: - if k < self.K_simpl-1: + ind = T_eval == self.C_simpl[k + 1][0] # Select sample C_{k+1} \in {0,1} + var = [a for b in self.C_simpl[: (k + 1)] for a in b[1]] # Variables up to k + var_next = [a for b in self.C_simpl[: (k + 2)] for a in b[1]] # Variables up to k+1 + if self.C_simpl[k + 1][0] == 1: + if k < self.K_simpl - 1: theta_scores_1 += self.alpha_dict[k] * ( - self.reg_dict[k+1].predict(X_eval[np.ix_(ind, var_next)]) + self.reg_dict[k + 1].predict(X_eval[np.ix_(ind, var_next)]) - self.reg_dict[k].predict(X_eval[np.ix_(ind, var)]) ) else: theta_scores_1 += self.alpha_dict[k] * ( self.h_fn(y_eval[ind]) - - self.reg_dict[self.K_simpl-1].predict(X_eval[np.ix_(ind, var)]) + - self.reg_dict[self.K_simpl - 1].predict(X_eval[np.ix_(ind, var)]) ) else: - if k < self.K_simpl-1: + if k < self.K_simpl - 1: theta_scores_0 += self.alpha_dict[k] * ( - self.reg_dict[k+1].predict(X_eval[np.ix_(ind, var_next)]) + self.reg_dict[k + 1].predict(X_eval[np.ix_(ind, var_next)]) - self.reg_dict[k].predict(X_eval[np.ix_(ind, var)]) ) else: theta_scores_0 += self.alpha_dict[k] * ( self.h_fn(y_eval[ind]) - - self.reg_dict[self.K_simpl-1].predict(X_eval[np.ix_(ind, var)]) + - self.reg_dict[self.K_simpl - 1].predict(X_eval[np.ix_(ind, var)]) ) - theta_scores = np.concatenate((theta_scores_0*n/n0, theta_scores_1*n/n1)) + theta_scores = np.concatenate((theta_scores_0 * n / n0, theta_scores_1 * n / n1)) else: raise AttributeError(f'Method "{method}" Not Implemented') # When C = [1, 1, ..., 1] we can just take the sample mean of y_eval[T_eval == 1] elif self.C_simpl[0][0] == 1: - theta_scores = np.concatenate((np.zeros_like(y_eval[T_eval==0]), self.h_fn(y_eval[T_eval == 1])*n/n1)) + theta_scores = np.concatenate((np.zeros_like(y_eval[T_eval == 0]), self.h_fn(y_eval[T_eval == 1]) * n / n1)) # When C = [0, 0, ..., 0] we can just take the sample mean of y_eval[T_eval == 0] else: - theta_scores = np.concatenate((self.h_fn(y_eval[T_eval == 0])*n/n0, np.zeros_like(y_eval[T_eval==1]))) + theta_scores = np.concatenate((self.h_fn(y_eval[T_eval == 0]) * n / n0, np.zeros_like(y_eval[T_eval == 1]))) return theta_scores @@ -571,7 +572,6 @@ def est_theta( all_indep=False, crop=1e-3, ): - """ This function computes the scores that are averaged to get each theta_hat, and then returns (theta_hat, std_error) @@ -655,13 +655,14 @@ def est_theta( ) if w_eval is not None: - w_sort = np.concatenate((w_eval[T_eval==0], w_eval[T_eval==1])) # Order weights in same way as scores + w_sort = np.concatenate((w_eval[T_eval == 0], w_eval[T_eval == 1])) # Order weights in same way as scores else: w_sort = np.ones(np.shape(T_eval)) weighted_stats = DescrStatsW(theta_scores, weights=w_sort, ddof=0) return weighted_stats.mean, weighted_stats.std_mean + def distribution_change_robust( causal_model: ProbabilisticCausalModel, old_data: pd.DataFrame, @@ -670,9 +671,9 @@ def distribution_change_robust( sample_weight=None, xfit=True, xfit_folds=5, - train_size=0.5, + train_size=0.5, calib_size=0.0, - split_random_state = 0, + split_random_state=0, method="MR", # One of 'regression', 're-weighting', 'MR', regressor=LinearRegression, regressor_args=(), @@ -690,11 +691,10 @@ def distribution_change_robust( crop=1e-3, shapley_config: Optional[ShapleyConfig] = None, ): - """ This function computes the Shapley values for attribution of change in the mean of target_node to nodes upstream in the causal DAG, using the multiply-robust method from: - Quintas-Martinez, V., Bahadori, M. T., Santiago, E., Mu, J., Janzing, D., and Heckerman, D. + Quintas-Martinez, V., Bahadori, M. T., Santiago, E., Mu, J., Janzing, D., and Heckerman, D. Multiply-Robust Causal Change Attribution (ICML 2024) :param causal_model: Reference causal model. @@ -758,33 +758,47 @@ def distribution_change_robust( if not xfit: if sample_weight is None: - X_train, X_eval, y_train, y_eval, T_train, T_eval = train_test_split(X, y, T, train_size = train_size, - stratify = T, random_state = split_random_state) + X_train, X_eval, y_train, y_eval, T_train, T_eval = train_test_split( + X, y, T, train_size=train_size, stratify=T, random_state=split_random_state + ) X_calib, T_calib = None, None - + if calibrator is not None: if calib_size == 0.0: - raise ValueError('For calibration, calib_size should be either positive and smaller than the number of samples or a float in the (0, 1) range.') - - X_calib, X_train, _, y_train, T_calib, T_train = train_test_split(X_train, y_train, T_train, train_size = calib_size, - stratify = T_train, random_state = split_random_state) - + raise ValueError( + "For calibration, calib_size should be either positive and smaller than the number of samples or a float in the (0, 1) range." + ) + + X_calib, X_train, _, y_train, T_calib, T_train = train_test_split( + X_train, y_train, T_train, train_size=calib_size, stratify=T_train, random_state=split_random_state + ) + w_eval, w_train, w_calib = None, None, None - + else: w = np.concatenate((old_data[sample_weight].values.flatten(), new_data[sample_weight].values.flatten())) - - X_train, X_eval, y_train, y_eval, T_train, T_eval, w_train, w_eval = train_test_split(X, y, T, w, train_size = train_size, - stratify = T, random_state = split_random_state) + + X_train, X_eval, y_train, y_eval, T_train, T_eval, w_train, w_eval = train_test_split( + X, y, T, w, train_size=train_size, stratify=T, random_state=split_random_state + ) X_calib, T_calib, w_calib = None, None, None - + if calibrator is not None: if calib_size == 0.0: - raise ValueError('For calibration, calib_size should be either positive and smaller than the number of samples or a float in the (0, 1) range.') - - X_calib, X_train, _, y_train, T_calib, T_train, w_calib, w_train = train_test_split(X_train, y_train, T_train, w_train, train_size = calib_size, - stratify = T_train, random_state = split_random_state) - + raise ValueError( + "For calibration, calib_size should be either positive and smaller than the number of samples or a float in the (0, 1) range." + ) + + X_calib, X_train, _, y_train, T_calib, T_train, w_calib, w_train = train_test_split( + X_train, + y_train, + T_train, + w_train, + train_size=calib_size, + stratify=T_train, + random_state=split_random_state, + ) + def set_func(C): return ThetaC(C, warn_th=0.0).est_theta( X_eval, @@ -812,33 +826,59 @@ def set_func(C): calibrator_kwargs=calibrator_kwargs, calibrator_fit_kwargs=calibrator_fit_kwargs, all_indep=all_indep, - crop=crop + crop=crop, )[0] - + attributions = estimate_shapley_values(set_func, len(sorted_var_names), shapley_config) if xfit: - kf = KFold(n_splits = xfit_folds, shuffle = True, random_state = split_random_state) + kf = KFold(n_splits=xfit_folds, shuffle=True, random_state=split_random_state) attributions = np.zeros(len(sorted_var_names)) - - for (train_index, test_index) in kf.split(X): - X_train, X_eval, y_train, y_eval, T_train, T_eval = X[train_index], X[test_index], y[train_index], y[test_index], T[train_index], T[test_index] - w = np.concatenate((old_data[sample_weight].values.flatten(), new_data[sample_weight].values.flatten())) if sample_weight is not None else None + + for train_index, test_index in kf.split(X): + X_train, X_eval, y_train, y_eval, T_train, T_eval = ( + X[train_index], + X[test_index], + y[train_index], + y[test_index], + T[train_index], + T[test_index], + ) + w = ( + np.concatenate((old_data[sample_weight].values.flatten(), new_data[sample_weight].values.flatten())) + if sample_weight is not None + else None + ) w_train, w_eval = (w[train_index], w[test_index]) if sample_weight is not None else (None, None) - + X_calib, T_calib, w_calib = None, None, None - + if calibrator is not None: if calib_size == 0.0: - raise ValueError('For calibration, calib_size should be either positive and smaller than the number of samples or a float in the (0, 1) range.') + raise ValueError( + "For calibration, calib_size should be either positive and smaller than the number of samples or a float in the (0, 1) range." + ) if sample_weight is None: - X_calib, X_train, _, y_train, T_calib, T_train = train_test_split(X_train, y_train, T_train, train_size = calib_size, - stratify = T_train, random_state = split_random_state) + X_calib, X_train, _, y_train, T_calib, T_train = train_test_split( + X_train, + y_train, + T_train, + train_size=calib_size, + stratify=T_train, + random_state=split_random_state, + ) else: - X_calib, X_train, _, y_train, T_calib, T_train, w_calib, w_train = train_test_split(X_train, y_train, T_train, w_train, train_size = calib_size, - stratify = T_train, random_state = split_random_state) + X_calib, X_train, _, y_train, T_calib, T_train, w_calib, w_train = train_test_split( + X_train, + y_train, + T_train, + w_train, + train_size=calib_size, + stratify=T_train, + random_state=split_random_state, + ) def set_func(C): return ThetaC(C, warn_th=0.0).est_theta( @@ -867,10 +907,9 @@ def set_func(C): calibrator_kwargs=calibrator_kwargs, calibrator_fit_kwargs=calibrator_fit_kwargs, all_indep=all_indep, - crop=crop + crop=crop, )[0] - attributions += estimate_shapley_values(set_func, len(sorted_var_names), shapley_config)/xfit_folds - - + attributions += estimate_shapley_values(set_func, len(sorted_var_names), shapley_config) / xfit_folds + return {x: attributions[i] for i, x in enumerate(sorted_var_names)} diff --git a/tests/gcm/test_distribution_change_robust.py b/tests/gcm/test_distribution_change_robust.py index 46466dd411..cf1f480848 100644 --- a/tests/gcm/test_distribution_change_robust.py +++ b/tests/gcm/test_distribution_change_robust.py @@ -1,46 +1,60 @@ -import numpy as np, pandas as pd, networkx as nx -from dowhy.gcm.distribution_change_robust import distribution_change_robust -from dowhy.gcm.shapley import * -from dowhy import gcm -from sklearn.ensemble import GradientBoostingRegressor, GradientBoostingClassifier -from sklearn.linear_model import LogisticRegression +import networkx as nx +import numpy as np +import pandas as pd from flaky import flaky from pytest import approx +from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor +from sklearn.linear_model import LogisticRegression -def _gen_data(seed=0, N=1000): +from dowhy import gcm +from dowhy.gcm.distribution_change_robust import distribution_change_robust +from dowhy.gcm.shapley import * + + +def _gen_data(seed=0, N=1000): np.random.seed(seed) - X1_old = np.random.normal(1,1,N) - X2_old = 0.5*X1_old + np.random.normal(0,1,N) - Y_old = X1_old + X2_old + 0.25*X1_old**2 + 0.25*X2_old**2 + np.random.normal(0,1,N) - X1_new = np.random.normal(1,1.1,N) - X2_new = 0.2*X1_new + np.random.normal(0,1,N) - Y_new = X1_new + X2_new + 0.25*X1_new**2 - 0.25*X2_new**2 + np.random.normal(0,1,N) - data_old = pd.DataFrame({'X1' : X1_old, 'X2' : X2_old, 'Y' : Y_old}) - data_new = pd.DataFrame({'X1' : X1_new, 'X2' : X2_new, 'Y' : Y_new}) + X1_old = np.random.normal(1, 1, N) + X2_old = 0.5 * X1_old + np.random.normal(0, 1, N) + Y_old = X1_old + X2_old + 0.25 * X1_old**2 + 0.25 * X2_old**2 + np.random.normal(0, 1, N) + X1_new = np.random.normal(1, 1.1, N) + X2_new = 0.2 * X1_new + np.random.normal(0, 1, N) + Y_new = X1_new + X2_new + 0.25 * X1_new**2 - 0.25 * X2_new**2 + np.random.normal(0, 1, N) + data_old = pd.DataFrame({"X1": X1_old, "X2": X2_old, "Y": Y_old}) + data_new = pd.DataFrame({"X1": X1_new, "X2": X2_new, "Y": Y_new}) return data_old, data_new + def _true_theta(C): - mX0, mX0sq = (1, 2.21) if C[0] else (1,2) - mX1, mX1sq = (0.2*mX0,0.2**2*mX0sq+1) if C[1] else (0.5*mX0,0.5**2*mX0sq+1) - mY = mX0 + mX1 + 0.25*mX0sq - 0.25*mX1sq if C[2] else mX0 + mX1 + 0.25*mX0sq + 0.25*mX1sq + mX0, mX0sq = (1, 2.21) if C[0] else (1, 2) + mX1, mX1sq = (0.2 * mX0, 0.2**2 * mX0sq + 1) if C[1] else (0.5 * mX0, 0.5**2 * mX0sq + 1) + mY = mX0 + mX1 + 0.25 * mX0sq - 0.25 * mX1sq if C[2] else mX0 + mX1 + 0.25 * mX0sq + 0.25 * mX1sq return mY -kwargs = {'regressor' : GradientBoostingRegressor, 'regressor_kwargs' : {'random_state' : 0}, - 'classifier' : GradientBoostingClassifier, 'classifier_kwargs' : {'random_state' : 0}, - 'calibrator' : LogisticRegression, 'calibrator_kwargs' : {'penalty' : None}, - 'calib_size' : 0.2, - 'xfit' : True, 'xfit_folds' : 10, - } + +kwargs = { + "regressor": GradientBoostingRegressor, + "regressor_kwargs": {"random_state": 0}, + "classifier": GradientBoostingClassifier, + "classifier_kwargs": {"random_state": 0}, + "calibrator": LogisticRegression, + "calibrator_kwargs": {"penalty": None}, + "calib_size": 0.2, + "xfit": True, + "xfit_folds": 10, +} + @flaky(max_runs=5) def test_given_two_data_sets_with_different_mechanisms_when_evaluate_distribution_change_then_returns_expected_result(): data_old, data_new = _gen_data() - causal_model = gcm.ProbabilisticCausalModel(nx.DiGraph([('X1', 'X2'), ('X1', 'Y'), ('X2', 'Y')])) + causal_model = gcm.ProbabilisticCausalModel(nx.DiGraph([("X1", "X2"), ("X1", "Y"), ("X2", "Y")])) gcm.auto.assign_causal_mechanisms(causal_model, data_old) - shap = distribution_change_robust(causal_model, data_old, data_new, 'Y', **kwargs) + shap = distribution_change_robust(causal_model, data_old, data_new, "Y", **kwargs) - true_shap = estimate_shapley_values(_true_theta, 3, ShapleyConfig(approximation_method = ShapleyApproximationMethods.EXACT)) + true_shap = estimate_shapley_values( + _true_theta, 3, ShapleyConfig(approximation_method=ShapleyApproximationMethods.EXACT) + ) assert shap["X1"] == approx(true_shap[0], abs=0.1) assert shap["X2"] == approx(true_shap[1], abs=0.1) - assert shap["Y"] == approx(true_shap[2], abs=0.1) \ No newline at end of file + assert shap["Y"] == approx(true_shap[2], abs=0.1)