From 81e2aa04829b526b2df9559e8791f550dbf78f34 Mon Sep 17 00:00:00 2001 From: rahulbshrestha Date: Sat, 8 Jun 2024 11:25:59 +0200 Subject: [PATCH] Add refute once function Signed-off-by: rahulbshrestha --- .../causal_refuters/dummy_outcome_refuter.py | 316 ++++++++++-------- 1 file changed, 175 insertions(+), 141 deletions(-) diff --git a/dowhy/causal_refuters/dummy_outcome_refuter.py b/dowhy/causal_refuters/dummy_outcome_refuter.py index 7ca55e23eb..d18653763b 100644 --- a/dowhy/causal_refuters/dummy_outcome_refuter.py +++ b/dowhy/causal_refuters/dummy_outcome_refuter.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +from joblib import Parallel, delayed from sklearn.ensemble import RandomForestRegressor from sklearn.linear_model import LinearRegression from sklearn.neighbors import KNeighborsRegressor @@ -241,6 +242,151 @@ def refute_estimate(self, show_progress_bar: bool = False): return refutes +def _refute_once( + data: pd.DataFrame, + estimate: CausalEstimate, + treatment_name: str, + outcome_name: str, + estimator_present: bool, + unobserved_confounder_values, + causal_effect_map, + identified_estimand, + test_fraction, + chosen_variables: Optional[List] = None, + transformation_list: List = DEFAULT_TRANSFORMATION, + true_causal_effect: Callable = DEFAULT_TRUE_CAUSAL_EFFECT, + min_data_point_threshold: float = MIN_DATA_POINT_THRESHOLD, + bucket_size_scale_factor: float = DEFAULT_BUCKET_SCALE_FACTOR, +): + estimates = [] + + if estimator_present == False: + # Warn the user that the specified parameter is not applicable when no estimator is present in the transformation + if test_fraction != DEFAULT_TEST_FRACTION: + logger.warning("'test_fraction' is not applicable as there is no base treatment value.") + + # Adding an unobserved confounder if provided by the user + if unobserved_confounder_values is not None: + data["simulated"] = unobserved_confounder_values + chosen_variables.append("simulated") + # We set X_train = 0 and outcome_train to be 0 + validation_df = data + X_train = None + outcome_train = None + X_validation_df = validation_df[chosen_variables] + + X_validation = X_validation_df.values + outcome_validation = validation_df[outcome_name].values + + # Get the final outcome, after running through all the values in the transformation list + outcome_validation = process_data( + outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list + ) + + # Check if the value of true effect has been already stored + # We use None as the key as we have no base category for this refutation + if None not in causal_effect_map: + # As we currently support only one treatment + causal_effect_map[None] = true_causal_effect(validation_df[treatment_name[0]]) + + outcome_validation += causal_effect_map[None] + + new_data = validation_df.assign(dummy_outcome=outcome_validation) + + new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand) + new_estimator.fit( + new_data, + effect_modifier_names=estimate.estimator._effect_modifier_names, + **new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {}, + ) + new_effect = new_estimator.estimate_effect( + new_data, + control_value=estimate.control_value, + treatment_value=estimate.treatment_value, + target_units=estimate.estimator._target_units, + ) + estimates.append(new_effect.value) + + else: + groups = preprocess_data_by_treatment( + data, treatment_name, unobserved_confounder_values, bucket_size_scale_factor, chosen_variables + ) + group_count = 0 + + if len(test_fraction) == 1: + test_fraction = len(groups) * test_fraction + + for key_train, _ in groups: + base_train = groups.get_group(key_train).sample(frac=test_fraction[group_count].base) + train_set = set([tuple(line) for line in base_train.values]) + total_set = set([tuple(line) for line in groups.get_group(key_train).values]) + base_validation = pd.DataFrame(list(total_set.difference(train_set)), columns=base_train.columns) + X_train_df = base_train[chosen_variables] + + X_train = X_train_df.values + outcome_train = base_train[outcome_name].values + + validation_df = [] + transformation_list_temp = transformation_list + validation_df.append(base_validation) + + for key_validation, _ in groups: + if key_validation != key_train: + validation_df.append(groups.get_group(key_validation).sample(frac=test_fraction[group_count].other)) + + validation_df = pd.concat(validation_df) + X_validation_df = validation_df[chosen_variables] + + X_validation = X_validation_df.values + outcome_validation = validation_df[outcome_name].values + + # If the number of data points is too few, run the default transformation: [("zero",""),("noise", {'std_dev':1} )] + if X_train.shape[0] <= min_data_point_threshold: + transformation_list_temp = DEFAULT_TRANSFORMATION + logger.warning( + "The number of data points in X_train:{} for category:{} is less than threshold:{}".format( + X_train.shape[0], key_train, min_data_point_threshold + ) + ) + logger.warning( + "Therefore, defaulting to the minimal set of transformations:{}".format(transformation_list_temp) + ) + + outcome_validation = process_data( + outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list_temp + ) + + # Check if the value of true effect has been already stored + # This ensures that we calculate the causal effect only once. + # We use key_train as we map data with respect to the base category of the data + + if key_train not in causal_effect_map: + # As we currently support only one treatment + causal_effect_map[key_train] = true_causal_effect(validation_df[treatment_name[0]]) + + # Add h(t) to f(W) to get the dummy outcome + outcome_validation += causal_effect_map[key_train] + + new_data = validation_df.assign(dummy_outcome=outcome_validation) + new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand) + new_estimator.fit( + new_data, + effect_modifier_names=estimate.estimator._effect_modifier_names, + **new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {}, + ) + new_effect = new_estimator.estimate_effect( + new_data, + control_value=estimate.control_value, + treatment_value=estimate.treatment_value, + target_units=estimate.estimator._target_units, + ) + + estimates.append(new_effect.value) + group_count += 1 + + return estimates + + def refute_dummy_outcome( data: pd.DataFrame, target_estimand: IdentifiedEstimand, @@ -256,6 +402,8 @@ def refute_dummy_outcome( unobserved_confounder_values: Optional[List] = DEFAULT_NEW_DATA_WITH_UNOBSERVED_CONFOUNDING, true_causal_effect: Callable = DEFAULT_TRUE_CAUSAL_EFFECT, show_progress_bar=False, + n_jobs: int = 1, + verbose: int = 0, **_, ) -> List[CausalRefutation]: """Refute an estimate by replacing the outcome with a simulated variable @@ -447,159 +595,45 @@ def refute_dummy_outcome( # Train and the Validation Datasets. Thus, we run the simulation loop followed by the training and the validation # loops. Thus, we can get different values everytime we get the estimator. - # for _ in range( self._num_simulations ): - for _ in tqdm( - range(num_simulations), - colour=CausalRefuter.PROGRESS_BAR_COLOR, - disable=not show_progress_bar, - desc="Refuting Estimates: ", - ): - estimates = [] - - if estimator_present == False: - - # Warn the user that the specified parameter is not applicable when no estimator is present in the transformation - if test_fraction != DEFAULT_TEST_FRACTION: - logger.warning("'test_fraction' is not applicable as there is no base treatment value.") - - # Adding an unobserved confounder if provided by the user - if unobserved_confounder_values is not None: - data["simulated"] = unobserved_confounder_values - chosen_variables.append("simulated") - # We set X_train = 0 and outcome_train to be 0 - validation_df = data - X_train = None - outcome_train = None - X_validation_df = validation_df[chosen_variables] - - X_validation = X_validation_df.values - outcome_validation = validation_df[outcome_name].values - - # Get the final outcome, after running through all the values in the transformation list - outcome_validation = process_data( - outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list - ) - - # Check if the value of true effect has been already stored - # We use None as the key as we have no base category for this refutation - if None not in causal_effect_map: - # As we currently support only one treatment - causal_effect_map[None] = true_causal_effect(validation_df[treatment_name[0]]) - - outcome_validation += causal_effect_map[None] - - new_data = validation_df.assign(dummy_outcome=outcome_validation) - - new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand) - new_estimator.fit( - new_data, - effect_modifier_names=estimate.estimator._effect_modifier_names, - **new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {}, - ) - new_effect = new_estimator.estimate_effect( - new_data, - control_value=estimate.control_value, - treatment_value=estimate.treatment_value, - target_units=estimate.estimator._target_units, - ) - estimates.append(new_effect.value) - - else: - - groups = preprocess_data_by_treatment( - data, treatment_name, unobserved_confounder_values, bucket_size_scale_factor, chosen_variables - ) - group_count = 0 - - if len(test_fraction) == 1: - test_fraction = len(groups) * test_fraction - - for key_train, _ in groups: - base_train = groups.get_group(key_train).sample(frac=test_fraction[group_count].base) - train_set = set([tuple(line) for line in base_train.values]) - total_set = set([tuple(line) for line in groups.get_group(key_train).values]) - base_validation = pd.DataFrame(list(total_set.difference(train_set)), columns=base_train.columns) - X_train_df = base_train[chosen_variables] - - X_train = X_train_df.values - outcome_train = base_train[outcome_name].values - - validation_df = [] - transformation_list_temp = transformation_list - validation_df.append(base_validation) - - for key_validation, _ in groups: - if key_validation != key_train: - validation_df.append( - groups.get_group(key_validation).sample(frac=test_fraction[group_count].other) - ) - - validation_df = pd.concat(validation_df) - X_validation_df = validation_df[chosen_variables] - - X_validation = X_validation_df.values - outcome_validation = validation_df[outcome_name].values - - # If the number of data points is too few, run the default transformation: [("zero",""),("noise", {'std_dev':1} )] - if X_train.shape[0] <= min_data_point_threshold: - transformation_list_temp = DEFAULT_TRANSFORMATION - logger.warning( - "The number of data points in X_train:{} for category:{} is less than threshold:{}".format( - X_train.shape[0], key_train, min_data_point_threshold - ) - ) - logger.warning( - "Therefore, defaulting to the minimal set of transformations:{}".format( - transformation_list_temp - ) - ) - - outcome_validation = process_data( - outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list_temp - ) - - # Check if the value of true effect has been already stored - # This ensures that we calculate the causal effect only once. - # We use key_train as we map data with respect to the base category of the data - - if key_train not in causal_effect_map: - # As we currently support only one treatment - causal_effect_map[key_train] = true_causal_effect(validation_df[treatment_name[0]]) - - # Add h(t) to f(W) to get the dummy outcome - outcome_validation += causal_effect_map[key_train] - - new_data = validation_df.assign(dummy_outcome=outcome_validation) - new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand) - new_estimator.fit( - new_data, - effect_modifier_names=estimate.estimator._effect_modifier_names, - **new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {}, - ) - new_effect = new_estimator.estimate_effect( - new_data, - control_value=estimate.control_value, - treatment_value=estimate.treatment_value, - target_units=estimate.estimator._target_units, - ) - - estimates.append(new_effect.value) - group_count += 1 + sample_estimates = Parallel(n_jobs=n_jobs, verbose=verbose)( + delayed(_refute_once)( + data=data, + estimate=estimate, + treatment_name=treatment_name, + outcome_name=outcome_name, + estimator_present=estimator_present, + unobserved_confounder_values=unobserved_confounder_values, + causal_effect_map=causal_effect_map, + identified_estimand=identified_estimand, + chosen_variables=chosen_variables, + transformation_list=transformation_list, + true_causal_effect=true_causal_effect, + min_data_point_threshold=min_data_point_threshold, + bucket_size_scale_factor=bucket_size_scale_factor, + test_fraction=test_fraction, + ) + for _ in tqdm( + range(num_simulations), + colour=CausalRefuter.PROGRESS_BAR_COLOR, + disable=not show_progress_bar, + desc="Refuting Estimates: ", + ) + ) - simulation_results.append(estimates) + # simulation_results.append(estimates) # We convert to ndarray for ease in indexing # The data is of the form # sim1: cat1 cat2 ... catn # sim2: cat1 cat2 ... catn - simulation_results = np.array(simulation_results) + simulation_results = np.array(sample_estimates) + # print('SIMULATION RESULTS::::: ', simulation_results) # Note: We would like the causal_estimator to find the true causal estimate that we have specified through this # refuter. Let the value of the true causal effect be h(t). In the following section of code, we wish to find out if h(t) falls in the # distribution of the refuter. if estimator_present == False: - dummy_estimate = CausalEstimate( data=None, treatment_name=estimate._treatment_name,