diff --git a/econml/dml/_rlearner.py b/econml/dml/_rlearner.py index 6eca79aab..2effb3d17 100644 --- a/econml/dml/_rlearner.py +++ b/econml/dml/_rlearner.py @@ -98,8 +98,8 @@ def __init__(self, model_final): def fit(self, Y, T, X=None, W=None, Z=None, nuisances=None, sample_weight=None, freq_weight=None, sample_var=None, groups=None): Y_res, T_res = nuisances - self._model_final.fit(X, T, T_res, Y_res, sample_weight=sample_weight, - freq_weight=freq_weight, sample_var=sample_var) + self._model_final.fit(X, T, T_res, Y_res, **(filter_none_kwargs(sample_weight=sample_weight, + freq_weight=freq_weight, sample_var=sample_var, groups=groups))) return self def predict(self, X=None): diff --git a/econml/inference/_inference.py b/econml/inference/_inference.py index 87b9538b7..cb001383e 100644 --- a/econml/inference/_inference.py +++ b/econml/inference/_inference.py @@ -465,22 +465,28 @@ class StatsModelsInference(LinearModelFinalInference): ---------- cov_type : str, default 'HC1' The type of covariance estimation method to use. Supported values are 'nonrobust', - 'HC0', 'HC1'. + 'HC0', 'HC1', 'clustered'. + cov_options : dict, optional + Additional options for covariance estimation. For clustered covariance, supports: + - 'group_correction': bool, default True. Whether to apply N_G/(N_G-1) correction. + - 'df_correction': bool, default True. Whether to apply (N-1)/(N-K) correction. """ - def __init__(self, cov_type='HC1'): - if cov_type not in ['nonrobust', 'HC0', 'HC1']: + def __init__(self, cov_type='HC1', cov_options=None): + if cov_type not in ['nonrobust', 'HC0', 'HC1', 'clustered']: raise ValueError("Unsupported cov_type; " "must be one of 'nonrobust', " - "'HC0', 'HC1'") + "'HC0', 'HC1', 'clustered'") self.cov_type = cov_type + self.cov_options = cov_options if cov_options is not None else {} def prefit(self, estimator, *args, **kwargs): super().prefit(estimator, *args, **kwargs) assert not (self.model_final.fit_intercept), ("Inference can only be performed on models linear in " "their features, but here fit_intercept is True") self.model_final.cov_type = self.cov_type + self.model_final.cov_options = self.cov_options class GenericModelFinalInferenceDiscrete(Inference): @@ -660,21 +666,27 @@ class StatsModelsInferenceDiscrete(LinearModelFinalInferenceDiscrete): ---------- cov_type : str, default 'HC1' The type of covariance estimation method to use. Supported values are 'nonrobust', - 'HC0', 'HC1'. + 'HC0', 'HC1', 'clustered'. + cov_options : dict, optional + Additional options for covariance estimation. For clustered covariance, supports: + - 'group_correction': bool, default True. Whether to apply N_G/(N_G-1) correction. + - 'df_correction': bool, default True. Whether to apply (N-1)/(N-K) correction. """ - def __init__(self, cov_type='HC1'): - if cov_type not in ['nonrobust', 'HC0', 'HC1']: + def __init__(self, cov_type='HC1', cov_options=None): + if cov_type not in ['nonrobust', 'HC0', 'HC1', 'clustered']: raise ValueError("Unsupported cov_type; " "must be one of 'nonrobust', " - "'HC0', 'HC1'") + "'HC0', 'HC1', 'clustered'") self.cov_type = cov_type + self.cov_options = cov_options if cov_options is not None else {} def prefit(self, estimator, *args, **kwargs): super().prefit(estimator, *args, **kwargs) # need to set the fit args before the estimator is fit self.model_final.cov_type = self.cov_type + self.model_final.cov_options = self.cov_options class InferenceResults(metaclass=abc.ABCMeta): diff --git a/econml/iv/dml/_dml.py b/econml/iv/dml/_dml.py index 12e690e34..58641a94a 100644 --- a/econml/iv/dml/_dml.py +++ b/econml/iv/dml/_dml.py @@ -157,7 +157,7 @@ def fit(self, Y, T, X=None, W=None, Z=None, nuisances=None, XT_res = self._combine(X, T_res) XZ_res = self._combine(X, Z_res) filtered_kwargs = filter_none_kwargs(sample_weight=sample_weight, - freq_weight=freq_weight, sample_var=sample_var) + freq_weight=freq_weight, sample_var=sample_var, groups=groups) self._model_final.fit(XZ_res, XT_res, Y_res, **filtered_kwargs) @@ -376,7 +376,9 @@ def __init__(self, *, mc_iters=None, mc_agg='mean', random_state=None, - allow_missing=False): + allow_missing=False, + cov_type="HC0", + cov_options=None): self.model_y_xw = clone(model_y_xw, safe=False) self.model_t_xw = clone(model_t_xw, safe=False) self.model_t_xwz = clone(model_t_xwz, safe=False) @@ -384,6 +386,8 @@ def __init__(self, *, self.projection = projection self.featurizer = clone(featurizer, safe=False) self.fit_cate_intercept = fit_cate_intercept + self.cov_type = cov_type + self.cov_options = cov_options if cov_options is not None else {} super().__init__(discrete_outcome=discrete_outcome, discrete_instrument=discrete_instrument, @@ -403,7 +407,7 @@ def _gen_featurizer(self): return clone(self.featurizer, safe=False) def _gen_model_final(self): - return StatsModels2SLS(cov_type="HC0") + return StatsModels2SLS(cov_type=self.cov_type, cov_options=self.cov_options) def _gen_ortho_learner_model_final(self): return _OrthoIVModelFinal(self._gen_model_final(), self._gen_featurizer(), self.fit_cate_intercept) diff --git a/econml/sklearn_extensions/linear_model.py b/econml/sklearn_extensions/linear_model.py index 9bc5529f7..17ddb3d6d 100644 --- a/econml/sklearn_extensions/linear_model.py +++ b/econml/sklearn_extensions/linear_model.py @@ -1693,21 +1693,29 @@ class StatsModelsLinearRegression(_StatsModelsWrapper): fit_intercept : bool, default True Whether to fit an intercept in this model cov_type : string, default "HC0" - The covariance approach to use. Supported values are "HCO", "HC1", and "nonrobust". + The covariance approach to use. Supported values are "HC0", "HC1", "nonrobust", and "clustered". + cov_options : dict, optional + Additional options for covariance estimation. For clustered covariance, supports: + - 'group_correction': bool, default True. Whether to apply N_G/(N_G-1) correction. + - 'df_correction': bool, default True. Whether to apply (N-1)/(N-K) correction. enable_federation : bool, default False Whether to enable federation (aggregating this model's results with other models in a distributed setting). This requires additional memory proportional to the number of columns in X to the fourth power. """ - def __init__(self, fit_intercept=True, cov_type="HC0", *, enable_federation=False): + def __init__(self, fit_intercept=True, cov_type="HC0", cov_options=None, *, enable_federation=False): self.cov_type = cov_type + self.cov_options = cov_options if cov_options is not None else {} + if cov_type == 'clustered': + self.cov_options.setdefault('group_correction', True) + self.cov_options.setdefault('df_correction', True) self.fit_intercept = fit_intercept self.enable_federation = enable_federation - def _check_input(self, X, y, sample_weight, freq_weight, sample_var): + def _check_input(self, X, y, sample_weight, freq_weight, sample_var, groups=None): """Check dimensions and other assertions.""" - X, y, sample_weight, freq_weight, sample_var = check_input_arrays( - X, y, sample_weight, freq_weight, sample_var, dtype='numeric') + X, y, sample_weight, freq_weight, sample_var, groups = check_input_arrays( + X, y, sample_weight, freq_weight, sample_var, groups, dtype='numeric') if X is None: X = np.empty((y.shape[0], 0)) if self.fit_intercept: @@ -1720,6 +1728,8 @@ def _check_input(self, X, y, sample_weight, freq_weight, sample_var): freq_weight = np.ones(y.shape[0]) if sample_var is None: sample_var = np.zeros(y.shape) + if groups is None: + groups = np.arange(y.shape[0]) # check freq_weight should be integer and should be accompanied by sample_var if np.any(np.not_equal(np.mod(freq_weight, 1), 0)): @@ -1753,7 +1763,7 @@ def _check_input(self, X, y, sample_weight, freq_weight, sample_var): # check array shape assert (X.shape[0] == y.shape[0] == sample_weight.shape[0] == - freq_weight.shape[0] == sample_var.shape[0]), "Input lengths not compatible!" + freq_weight.shape[0] == sample_var.shape[0] == groups.shape[0]), "Input lengths not compatible!" if y.ndim >= 2: assert (y.ndim == sample_var.ndim and y.shape[1] == sample_var.shape[1]), "Input shapes not compatible: {}, {}!".format( @@ -1767,9 +1777,9 @@ def _check_input(self, X, y, sample_weight, freq_weight, sample_var): else: weighted_y = y * np.sqrt(sample_weight).reshape(-1, 1) sample_var = sample_var * (sample_weight.reshape(-1, 1)) - return weighted_X, weighted_y, freq_weight, sample_var + return weighted_X, weighted_y, freq_weight, sample_var, groups - def fit(self, X, y, sample_weight=None, freq_weight=None, sample_var=None): + def fit(self, X, y, sample_weight=None, freq_weight=None, sample_var=None, groups=None): """ Fits the model. @@ -1788,13 +1798,15 @@ def fit(self, X, y, sample_weight=None, freq_weight=None, sample_var=None): sample_var : {(N,), (N, p)} nd array_like or None Variance of the outcome(s) of the original freq_weight[i] observations that were used to compute the mean outcome represented by observation i. + groups : (N,) array_like or None + Group labels for clustered standard errors. Returns ------- self : StatsModelsLinearRegression """ # TODO: Add other types of covariance estimation (e.g. Newey-West (HAC), HC2, HC3) - X, y, freq_weight, sample_var = self._check_input(X, y, sample_weight, freq_weight, sample_var) + X, y, freq_weight, sample_var, groups = self._check_input(X, y, sample_weight, freq_weight, sample_var, groups) WX = X * np.sqrt(freq_weight).reshape(-1, 1) @@ -1840,6 +1852,32 @@ def fit(self, X, y, sample_weight=None, freq_weight=None, sample_var=None): self.XXXy = np.einsum('nx,ny->yx', WX, wy) self.XXXX = np.einsum('nw,nx->wx', WX, WX) self.sample_var = np.average(sv, weights=freq_weight, axis=0) * n_obs + elif self.cov_type == 'clustered': + group_ids, inverse_idx = np.unique(groups, return_inverse=True) + n_groups = len(group_ids) + k = WX.shape[1] + + S_local = np.einsum('ni,nj->nij', WX, X) # (N, k, k) + S_flat = S_local.reshape(S_local.shape[0], -1) # (N, k*k) + group_S_flat = np.zeros((n_groups, k * k)) + np.add.at(group_S_flat, inverse_idx, S_flat) + group_S = group_S_flat.reshape(n_groups, k, k) # (G, k, k) + + y2d = y.reshape(-1, 1) if y.ndim < 2 else y # (N, p) + TY_local = y2d[:, :, None] * WX[:, None, :] # (N, p, k) + TY_flat = TY_local.reshape(TY_local.shape[0], -1) # (N, p*k) + group_T_flat = np.zeros((n_groups, y2d.shape[1] * k)) + np.add.at(group_T_flat, inverse_idx, TY_flat) + group_t = group_T_flat.reshape(n_groups, y2d.shape[1], k).transpose(1, 0, 2) # (p, G, k) + + TT = np.einsum('ygk,ygl->ykl', group_t, group_t) # (p, k, k) + ST = np.einsum('gvw,ygx->yvwx', group_S, group_t) # (p, k, k, k) + SS = np.einsum('gvu,gwx->vuwx', group_S, group_S) # (k, k, k, k) + + self.CL_TT = TT + self.CL_ST = ST + self.CL_SS = SS + self._n_groups = n_groups sigma_inv = np.linalg.pinv(self.XX) @@ -1871,8 +1909,17 @@ def fit(self, X, y, sample_weight=None, freq_weight=None, sample_var=None): for j in range(self._n_out): weighted_sigma = np.matmul(WX.T, WX * var_i[:, [j]]) self._var.append(correction * np.matmul(sigma_inv, np.matmul(weighted_sigma, sigma_inv))) + elif (self.cov_type == 'clustered'): + f_weight = np.sqrt(freq_weight) if y.ndim < 2 else np.sqrt(freq_weight).reshape(-1, 1) + centered_y = y - np.matmul(X, param) + self._var = self._compute_clustered_variance_linear( + WX, + centered_y * f_weight, + sigma_inv, + groups + ) else: - raise AttributeError("Unsupported cov_type. Must be one of nonrobust, HC0, HC1.") + raise AttributeError("Unsupported cov_type. Must be one of nonrobust, HC0, HC1, clustered.") self._param_var = np.array(self._var) @@ -1896,24 +1943,23 @@ def aggregate(models: List[StatsModelsLinearRegression]): if len(models) == 0: raise ValueError("Must aggregate at least one model!") cov_types = set([model.cov_type for model in models]) + cov_options = set([str(model.cov_options) for model in models]) fit_intercepts = set([model.fit_intercept for model in models]) enable_federation = set([model.enable_federation for model in models]) _n_outs = set([model._n_out for model in models]) assert enable_federation == {True}, "All models must have enable_federation=True!" assert len(cov_types) == 1, "All models must have the same cov_type!" + assert len(cov_options) == 1, "All models must have the same cov_options!" assert len(fit_intercepts) == 1, "All models must have the same fit_intercept!" assert len(_n_outs) == 1, "All models must have the same number of outcomes!" - agg_model = StatsModelsLinearRegression(cov_type=models[0].cov_type, fit_intercept=models[0].fit_intercept) + agg_model = StatsModelsLinearRegression(cov_type=models[0].cov_type, + cov_options=models[0].cov_options, + fit_intercept=models[0].fit_intercept) agg_model._n_out = models[0]._n_out XX = np.sum([model.XX for model in models], axis=0) Xy = np.sum([model.Xy for model in models], axis=0) - XXyy = np.sum([model.XXyy for model in models], axis=0) - XXXy = np.sum([model.XXXy for model in models], axis=0) - XXXX = np.sum([model.XXXX for model in models], axis=0) - - sample_var = np.sum([model.sample_var for model in models], axis=0) n_obs = np.sum([model._n_obs for model in models], axis=0) sigma_inv = np.linalg.pinv(XX) @@ -1930,30 +1976,125 @@ def aggregate(models: List[StatsModelsLinearRegression]): else: # both HC1 and nonrobust use the same correction factor correction = (n_obs / (n_obs - df)) - if agg_model.cov_type in ['HC0', 'HC1']: - weighted_sigma = XXyy - 2 * np.einsum('yvwx,vy->ywx', XXXy, param) + \ - np.einsum('uvwx,uy,vy->ywx', XXXX, param, param) + sample_var - if agg_model._n_out == 0: - agg_model._var = correction * np.matmul(sigma_inv, np.matmul(weighted_sigma.squeeze(0), sigma_inv)) + (agg_model.XX, agg_model.Xy, agg_model._n_obs) = (XX, Xy, n_obs) + + if agg_model.cov_type == 'clustered': + TT = np.sum([m.CL_TT for m in models], axis=0) # (p, k, k) + ST = np.sum([m.CL_ST for m in models], axis=0) # (p, k, k, k) + SS = np.sum([m.CL_SS for m in models], axis=0) # (k, k, k, k) + G = int(np.sum([m._n_groups for m in models])) # total clusters + + (agg_model.CL_TT, agg_model.CL_ST, agg_model.CL_SS, agg_model._n_groups) = (TT, ST, SS, G) + + if G <= 1: + warnings.warn("Number of clusters <= 1. Using biased clustered variance calculation!") + group_correction = 1.0 else: - agg_model._var = [correction * np.matmul(sigma_inv, np.matmul(ws, sigma_inv)) for ws in weighted_sigma] + group_correction = (G / (G - 1)) if agg_model.cov_options['group_correction'] else 1.0 + + df_correction = ((n_obs - 1) / (n_obs - df)) if agg_model.cov_options['df_correction'] else 1.0 + correction_clustered = group_correction * df_correction + + param_T = param.T # (p, k) + # subtract cross terms of t_g and S_g @ beta + cross_tmp = np.einsum('yvwu,yw->yvu', ST, param_T) # (p, k, k) with axes (y, v, u) + cross_left = np.swapaxes(cross_tmp, 1, 2) # (p, k, k) with axes (y, u, v) + cross_right = np.transpose(cross_left, (0, 2, 1)) # (p, k, k) + # add quadratic term for (S_g @ beta)(S_g @ beta)^T + quad = np.einsum('uvwx,yw,yx->yuv', + np.transpose(SS, (0, 2, 1, 3)), + param_T, + param_T) + S = TT - cross_left - cross_right + quad # (p, k, k) - else: - assert agg_model.cov_type == 'nonrobust' or agg_model.cov_type is None - sigma = XXyy - 2 * np.einsum('yx,xy->y', XXXy, param) + np.einsum('wx,wy,xy->y', XXXX, param, param) - var_i = (sample_var + sigma) / n_obs if agg_model._n_out == 0: - agg_model._var = correction * var_i * sigma_inv + V = correction_clustered * (sigma_inv @ S.squeeze(0) @ sigma_inv) + agg_model._var = V else: - agg_model._var = [correction * var * sigma_inv for var in var_i] + agg_model._var = [correction_clustered * (sigma_inv @ S[j] @ sigma_inv) for j in range(S.shape[0])] + agg_model._param_var = np.array(agg_model._var) + else: + assert agg_model.cov_type in ['HC0', 'HC1', 'nonrobust', None] + XXyy = np.sum([model.XXyy for model in models], axis=0) + XXXy = np.sum([model.XXXy for model in models], axis=0) + XXXX = np.sum([model.XXXX for model in models], axis=0) + sample_var = np.sum([model.sample_var for model in models], axis=0) + + (agg_model.sample_var, agg_model.XXyy, agg_model.XXXy, agg_model.XXXX) = sample_var, XXyy, XXXy, XXXX + + if agg_model.cov_type in ['HC0', 'HC1']: + weighted_sigma = XXyy - 2 * np.einsum('yvwx,vy->ywx', XXXy, param) + \ + np.einsum('uvwx,uy,vy->ywx', XXXX, param, param) + sample_var + matrices = [weighted_sigma.squeeze(0)] if agg_model._n_out == 0 else list(weighted_sigma) + agg_model._var = [correction * np.matmul(sigma_inv, np.matmul(ws, sigma_inv)) + for ws in matrices] + else: # non-robust + sigma = XXyy - 2 * np.einsum('yx,xy->y', XXXy, param) + np.einsum('wx,wy,xy->y', XXXX, param, param) + var_i = (sample_var + sigma) / n_obs + matrices = [var_i] if agg_model._n_out == 0 else list(var_i) + agg_model._var = [correction * var * sigma_inv for var in matrices] - agg_model._param_var = np.array(agg_model._var) + if agg_model._n_out == 0: + agg_model._var = agg_model._var[0] - (agg_model.XX, agg_model.Xy, agg_model.XXyy, agg_model.XXXy, agg_model.XXXX, - agg_model.sample_var, agg_model._n_obs) = XX, Xy, XXyy, XXXy, XXXX, sample_var, n_obs + agg_model._param_var = np.array(agg_model._var) return agg_model + def _compute_clustered_variance_linear(self, WX, eps_i, sigma_inv, groups): + """ + Compute clustered standard errors for linear regression. + + Parameters + ---------- + WX : array_like + Weighted design matrix + eps_i : array_like + Residuals + sigma_inv : array_like + Inverse of X.T @ X + groups : array_like + Group labels for clustering + + Returns + ------- + var : array_like or list + Clustered variance matrix + """ + n, k = WX.shape + group_ids, inverse_idx = np.unique(groups, return_inverse=True) + n_groups = len(group_ids) + + if n_groups <= 1: + warnings.warn("Number of clusters <= 1. Using biased clustered variance calculation!") + group_correction = 1.0 + else: + group_correction = (n_groups / (n_groups - 1)) if self.cov_options['group_correction'] else 1.0 + + df_correction = ((n - 1) / (n - k)) if self.cov_options['df_correction'] else 1.0 + correction = group_correction * df_correction + + if eps_i.ndim < 2: + # Single outcome case + WX_e = WX * eps_i.reshape(-1, 1) + group_sums = np.zeros((n_groups, k)) + np.add.at(group_sums, inverse_idx, WX_e) + s = group_sums.T @ group_sums + + return correction * np.matmul(sigma_inv, np.matmul(s, sigma_inv)) + else: + # Multiple outcome case + var_list = [] + for j in range(eps_i.shape[1]): + WX_e = WX * eps_i[:, [j]] + group_sums = np.zeros((n_groups, k)) + np.add.at(group_sums, inverse_idx, WX_e) + s = group_sums.T @ group_sums + + var_list.append(correction * np.matmul(sigma_inv, np.matmul(s, sigma_inv))) + + return var_list + class StatsModelsRLM(_StatsModelsWrapper): """ @@ -2040,23 +2181,36 @@ class StatsModels2SLS(_StatsModelsWrapper): Parameters ---------- - cov_type : {'HC0', 'HC1', 'nonrobust', or None}, default 'HC0' - Indicates how the covariance matrix is estimated. + cov_type : {'HC0', 'HC1', 'nonrobust', 'clustered', or None}, default 'HC0' + Indicates how the covariance matrix is estimated. 'clustered' requires groups to be provided in fit(). + cov_options : dict, optional + Additional options for covariance estimation. For clustered covariance, supports: + - 'group_correction': bool, default True. Whether to apply N_G/(N_G-1) correction. + - 'df_correction': bool, default True. Whether to apply (N-1)/(N-K) correction. """ - def __init__(self, cov_type="HC0"): + def __init__(self, cov_type="HC0", cov_options=None): self.fit_intercept = False self.cov_type = cov_type + self.cov_options = cov_options if cov_options is not None else {} + if cov_type == 'clustered': + self.cov_options.setdefault('group_correction', True) + self.cov_options.setdefault('df_correction', True) return - def _check_input(self, Z, T, y, sample_weight): + def _check_input(self, Z, T, y, sample_weight, groups=None): """Check dimensions and other assertions.""" # set default values for None if sample_weight is None: sample_weight = np.ones(y.shape[0]) + if groups is None: + groups = np.arange(y.shape[0]) + else: + groups = np.asarray(groups) # check array shape - assert (T.shape[0] == Z.shape[0] == y.shape[0] == sample_weight.shape[0]), "Input lengths not compatible!" + assert (T.shape[0] == Z.shape[0] == y.shape[0] == sample_weight.shape[0] == groups.shape[0]), \ + "Input lengths not compatible!" # check dimension of instruments is more than dimension of treatments if Z.shape[1] < T.shape[1]: @@ -2073,9 +2227,9 @@ def _check_input(self, Z, T, y, sample_weight): weighted_y = y * np.sqrt(sample_weight) else: weighted_y = y * np.sqrt(sample_weight).reshape(-1, 1) - return weighted_Z, weighted_T, weighted_y + return weighted_Z, weighted_T, weighted_y, groups - def fit(self, Z, T, y, sample_weight=None, freq_weight=None, sample_var=None): + def fit(self, Z, T, y, sample_weight=None, freq_weight=None, sample_var=None, groups=None): """ Fits the model. @@ -2096,7 +2250,8 @@ def fit(self, Z, T, y, sample_weight=None, freq_weight=None, sample_var=None): sample_var : {(N,), (N, p)} nd array_like or None Variance of the outcome(s) of the original freq_weight[i] observations that were used to compute the mean outcome represented by observation i. - + groups : (N,) array_like or None + Group labels for clustered standard errors. Required when cov_type='clustered'. Returns ------- @@ -2105,7 +2260,7 @@ def fit(self, Z, T, y, sample_weight=None, freq_weight=None, sample_var=None): assert freq_weight is None, "freq_weight is not supported yet for this class!" assert sample_var is None, "sample_var is not supported yet for this class!" - Z, T, y = self._check_input(Z, T, y, sample_weight) + Z, T, y, groups = self._check_input(Z, T, y, sample_weight, groups) self._n_out = 0 if y.ndim < 2 else y.shape[1] @@ -2164,8 +2319,64 @@ def fit(self, Z, T, y, sample_weight=None, freq_weight=None, sample_var=None): weighted_sigma = np.matmul(that.T, that * var_i[:, [j]]) self._var.append(correction * np.matmul(thatT_that_inv, np.matmul(weighted_sigma, thatT_that_inv))) + elif (self.cov_type == 'clustered'): + self._var = self._compute_clustered_variance(that, y - np.dot(T, param), thatT_that_inv, groups) else: - raise AttributeError("Unsupported cov_type. Must be one of nonrobust, HC0, HC1.") + raise AttributeError("Unsupported cov_type. Must be one of nonrobust, HC0, HC1, clustered.") self._param_var = np.array(self._var) return self + + def _compute_clustered_variance(self, that, eps_i, thatT_that_inv, groups): + """ + Compute clustered standard errors. + + Parameters + ---------- + that : array_like + Fitted values from first stage + eps_i : array_like + Residuals + thatT_that_inv : array_like + Inverse of that.T @ that + groups : array_like + Group labels for clustering + + Returns + ------- + var : array_like or list + Clustered variance matrix + """ + n, k = that.shape + group_ids, inverse_idx = np.unique(groups, return_inverse=True) + n_groups = len(group_ids) + + if n_groups <= 1: + warnings.warn("Number of clusters <= 1. Using biased clustered variance calculation!") + group_correction = 1.0 + else: + group_correction = (n_groups / (n_groups - 1)) if self.cov_options['group_correction'] else 1.0 + + df_correction = ((n - 1) / (n - k)) if self.cov_options['df_correction'] else 1.0 + correction = group_correction * df_correction + + if eps_i.ndim < 2: + # Single outcome case + that_e = that * eps_i.reshape(-1, 1) + group_sums = np.zeros((n_groups, k)) + np.add.at(group_sums, inverse_idx, that_e) + s = group_sums.T @ group_sums + + return correction * np.matmul(thatT_that_inv, np.matmul(s, thatT_that_inv)) + else: + # Multiple outcome case + var_list = [] + for j in range(eps_i.shape[1]): + that_e = that * eps_i[:, [j]] + group_sums = np.zeros((n_groups, k)) + np.add.at(group_sums, inverse_idx, that_e) + s = group_sums.T @ group_sums + + var_list.append(correction * np.matmul(thatT_that_inv, np.matmul(s, thatT_that_inv))) + + return var_list diff --git a/econml/tests/test_clustered_se.py b/econml/tests/test_clustered_se.py new file mode 100644 index 000000000..e18569ba6 --- /dev/null +++ b/econml/tests/test_clustered_se.py @@ -0,0 +1,348 @@ +# Copyright (c) PyWhy contributors. All rights reserved. +# Licensed under the MIT License. + +import unittest +import numpy as np +import pytest +from sklearn.linear_model import LassoCV, LogisticRegression +import statsmodels.api as sm +from econml.dml import DML +from econml.iv.dml import OrthoIV +from econml.utilities import shape +from econml.sklearn_extensions.linear_model import StatsModelsLinearRegression + + +@pytest.mark.cate_api +class TestClusteredSE(unittest.TestCase): + + def test_clustered_se_dml(self): + """Test that LinearDML works with clustered standard errors.""" + np.random.seed(123) + n = 500 + n_groups = 25 + + # Generate data with clustering structure + X = np.random.normal(0, 1, (n, 3)) + W = np.random.normal(0, 1, (n, 2)) + groups = np.random.randint(0, n_groups, n) + T = np.random.binomial(1, 0.5, n) + + # Add group-level effects to create clustering + group_effects = np.random.normal(0, 1, n_groups) + Y = X[:, 0] + 2 * T + group_effects[groups] + np.random.normal(0, 0.5, n) + + # Test DML with clustered standard errors via custom model_final + est = DML(model_y=LassoCV(), model_t=LogisticRegression(), + model_final=StatsModelsLinearRegression(fit_intercept=False, cov_type='clustered'), + discrete_treatment=True) + est.fit(Y, T, X=X, W=W, groups=groups) + + # Test basic functionality + effects = est.effect(X[:10]) + self.assertEqual(shape(effects), (10,)) + + # Test confidence intervals + lb, ub = est.effect_interval(X[:10], alpha=0.05) + self.assertEqual(shape(lb), (10,)) + self.assertEqual(shape(ub), (10,)) + self.assertTrue(np.all(lb <= ub)) + + # Test that clustered SEs are different from non-clustered + est_regular = DML(model_y=LassoCV(), model_t=LogisticRegression(), + model_final=StatsModelsLinearRegression(fit_intercept=False, cov_type='nonrobust'), + discrete_treatment=True) + est_regular.fit(Y, T, X=X, W=W) + + lb_regular, ub_regular = est_regular.effect_interval(X[:10], alpha=0.05) + + # Confidence intervals should be different (not identical) + self.assertFalse(np.allclose(lb, lb_regular, atol=1e-10)) + self.assertFalse(np.allclose(ub, ub_regular, atol=1e-10)) + + def test_clustered_se_iv(self): + """Test that OrthoIV works with clustered standard errors.""" + np.random.seed(123) + n = 500 + n_groups = 25 + + # Generate data with clustering structure + X = np.random.normal(0, 1, (n, 3)) + W = np.random.normal(0, 1, (n, 2)) + groups = np.random.randint(0, n_groups, n) + Z = np.random.binomial(1, 0.5, n) + T = np.random.binomial(1, 0.5, n) + + # Add group-level effects to create clustering + group_effects = np.random.normal(0, 1, n_groups) + Y = X[:, 0] + 2 * T + group_effects[groups] + np.random.normal(0, 0.5, n) + + # Test OrthoIV with clustered standard errors + est = OrthoIV(discrete_treatment=True, discrete_instrument=True, + cov_type='clustered') + est.fit(Y, T, Z=Z, X=X, W=W, groups=groups) + + # Test basic functionality + effects = est.effect(X[:10]) + self.assertEqual(shape(effects), (10,)) + + # Test confidence intervals + lb, ub = est.effect_interval(X[:10], alpha=0.05) + self.assertEqual(shape(lb), (10,)) + self.assertEqual(shape(ub), (10,)) + self.assertTrue(np.all(lb <= ub)) + + # Test that clustered SEs are different from non-clustered + est_regular = OrthoIV(discrete_treatment=True, discrete_instrument=True, + cov_type='nonrobust') + est_regular.fit(Y, T, Z=Z, X=X, W=W) + + lb_regular, ub_regular = est_regular.effect_interval(X[:10], alpha=0.05) + + # Confidence intervals should be different (not identical) + self.assertFalse(np.allclose(lb, lb_regular, atol=1e-10)) + self.assertFalse(np.allclose(ub, ub_regular, atol=1e-10)) + + def test_clustered_se_without_groups_defaults_to_individual(self): + """Test that clustered SE without groups matches HC0 with adjustment factor.""" + np.random.seed(123) + n = 100 + X = np.random.normal(0, 1, (n, 2)) + T = np.random.binomial(1, 0.5, n) + Y = np.random.normal(0, 1, n) + + # Clustered SE with default corrections (both enabled) + np.random.seed(123) + est_clustered = DML(model_y=LassoCV(), model_t=LogisticRegression(), + model_final=StatsModelsLinearRegression(fit_intercept=False, cov_type='clustered'), + discrete_treatment=True) + est_clustered.fit(Y, T, X=X) + + # HC0 for comparison + np.random.seed(123) + est_hc0 = DML(model_y=LassoCV(), model_t=LogisticRegression(), + model_final=StatsModelsLinearRegression(fit_intercept=False, cov_type='HC0'), + discrete_treatment=True) + est_hc0.fit(Y, T, X=X) + + # Get confidence intervals + X_test = X[:5] + lb_clustered, ub_clustered = est_clustered.effect_interval(X_test, alpha=0.05) + lb_hc0, ub_hc0 = est_hc0.effect_interval(X_test, alpha=0.05) + + # With both corrections: sqrt(n/(n-1)) * sqrt((n-1)/(n-k)) = sqrt(n/(n-k)) + # Get k from the fitted model (includes treatment variable) + k_params = est_clustered.model_final_.coef_.shape[0] + correction_factor = np.sqrt(n / (n - k_params)) + expected_width = (ub_hc0 - lb_hc0) * correction_factor + actual_width = ub_clustered - lb_clustered + np.testing.assert_allclose(actual_width, expected_width, rtol=1e-10) + + # Test basic functionality still works + effects = est_clustered.effect(X_test) + self.assertEqual(shape(effects), (5,)) + self.assertTrue(np.all(np.isfinite(effects))) + self.assertTrue(np.all(np.isfinite(lb_clustered))) + self.assertTrue(np.all(np.isfinite(ub_clustered))) + + def test_clustered_se_matches_statsmodels(self): + """Test that our final stage clustered SE matches statsmodels exactly.""" + np.random.seed(42) + n = 200 + n_groups = 20 + + # Generate simple data for direct comparison with clustering + X = np.random.normal(0, 1, (n, 2)) + groups = np.random.randint(0, n_groups, n) + group_effects = np.random.normal(0, 0.5, n_groups) + Y = 1 + 2 * X[:, 0] + 3 * X[:, 1] + group_effects[groups] + np.random.normal(0, 0.5, n) + + # Fit with our StatsModelsLinearRegression directly + X_with_intercept = np.column_stack([np.ones(n), X]) + econml_model = StatsModelsLinearRegression(cov_type='clustered') + econml_model.fit(X, Y, groups=groups) + econml_se = econml_model.coef_stderr_[0] # SE for X[:, 0] coefficient + + # Fit equivalent model with statsmodels + sm_model = sm.OLS(Y, X_with_intercept).fit(cov_type='cluster', cov_kwds={'groups': groups}) + sm_se = sm_model.bse[1] # SE for X[:, 0] coefficient + + # Statsmodels applies both G/(G-1) and (N-1)/(N-K) corrections by default + # Our implementation also applies both by default, so they should match + relative_diff = abs(econml_se - sm_se) / sm_se + self.assertLess(relative_diff, 1e-4, + f"EconML SE ({econml_se:.8f}) differs from statsmodels SE ({sm_se:.8f})") + + def test_clustered_micro_equals_aggregated(self): + """Test that clustered SE matches for summarized and non-summarized data.""" + + def _generate_micro_and_aggregated(rng, *, n_groups=12, cells_per_group=6, d=4, p=1): + """Build a micro dataset and aggregated counterpart with many freq > 1.""" + G = n_groups + K = cells_per_group + N = G * K + + # Design + X = rng.normal(size=(N, d)) + # True coefficients used just to generate data; intercept will be fit by the model + beta_true = rng.normal(size=(d + 1, p)) + + # Positive sample weights and integer freq weights with many freq > 1 + sw = np.exp(rng.normal(scale=0.3, size=N)) + freq = rng.integers(1, 6, size=N) # values in {1,2,3,4,5} + + # Group labels + groups = np.repeat(np.arange(G), K) + + # Build micro outcomes y_{ij} + ybar = np.zeros((N, p), dtype=float) + svar = np.zeros((N, p), dtype=float) + + X_micro, y_micro, sw_micro, groups_micro = [], [], [], [] + + for i in range(N): + f = int(freq[i]) + x_i = X[i] + mu_i = np.concatenate(([1.0], x_i)) @ beta_true # shape (p,) + eps = rng.normal(scale=1.0, size=(f, p)) + y_ij = mu_i + eps # shape (f, p) + + X_micro.append(np.repeat(x_i[None, :], f, axis=0)) + y_micro.append(y_ij) + sw_micro.append(np.repeat(sw[i], f)) + groups_micro.append(np.repeat(groups[i], f)) + + ybar[i, :] = y_ij.mean(axis=0) + svar[i, :] = y_ij.var(axis=0, ddof=0) + + X_micro = np.vstack(X_micro) + y_micro = np.vstack(y_micro) + sw_micro = np.concatenate(sw_micro) + groups_micro = np.concatenate(groups_micro) + + if p == 1: + ybar = ybar.ravel() + svar = svar.ravel() + y_micro = y_micro.ravel() + + return (X, ybar, sw, freq, svar, groups), (X_micro, y_micro, sw_micro, groups_micro) + + rng = np.random.default_rng(7) + for p in [1, 3]: + (X, ybar, sw, freq, svar, groups), (X_micro, y_micro, sw_micro, groups_micro) = \ + _generate_micro_and_aggregated(rng, n_groups=10, cells_per_group=7, d=5, p=p) + + # Disable DF correction since n differs between aggregated and micro datasets + cov_opts = {'group_correction': True, 'df_correction': False} + m_agg = StatsModelsLinearRegression(fit_intercept=True, cov_type="clustered", + cov_options=cov_opts, enable_federation=False) + m_agg.fit(X, ybar, sample_weight=sw, freq_weight=freq, sample_var=svar, groups=groups) + + m_micro = StatsModelsLinearRegression(fit_intercept=True, cov_type="clustered", + cov_options=cov_opts, enable_federation=False) + m_micro.fit( + X_micro, + y_micro, + sample_weight=sw_micro, + freq_weight=None, + sample_var=None, + groups=groups_micro + ) + + np.testing.assert_allclose(m_agg._param, m_micro._param, rtol=1e-12, atol=1e-12) + np.testing.assert_allclose(np.array(m_agg._param_var), np.array(m_micro._param_var), + rtol=1e-10, atol=1e-12) + + def test_clustered_correction_factors(self): + """Test that correction factors are applied correctly.""" + np.random.seed(42) + n = 200 + n_groups = 20 + X = np.random.randn(n, 3) + groups = np.repeat(np.arange(n_groups), n // n_groups) + y = X[:, 0] + 0.5 * X[:, 1] + np.random.randn(n) * 0.5 + + # Fit models with different correction options + m_none = StatsModelsLinearRegression( + cov_type='clustered', + cov_options={'group_correction': False, 'df_correction': False} + ).fit(X, y, groups=groups) + + m_group = StatsModelsLinearRegression( + cov_type='clustered', + cov_options={'group_correction': True, 'df_correction': False} + ).fit(X, y, groups=groups) + + m_df = StatsModelsLinearRegression( + cov_type='clustered', + cov_options={'group_correction': False, 'df_correction': True} + ).fit(X, y, groups=groups) + + m_both = StatsModelsLinearRegression( + cov_type='clustered', + cov_options={'group_correction': True, 'df_correction': True} + ).fit(X, y, groups=groups) + + # Get actual number of parameters + k_params = len(m_none.coef_) + 1 + + # Verify group correction + group_ratio = m_group.coef_stderr_ / m_none.coef_stderr_ + expected_group_ratio = np.sqrt(n_groups / (n_groups - 1)) + np.testing.assert_allclose(group_ratio, expected_group_ratio, rtol=1e-10) + + # Verify DF correction + df_ratio = m_df.coef_stderr_ / m_none.coef_stderr_ + expected_df_ratio = np.sqrt((n - 1) / (n - k_params)) + np.testing.assert_allclose(df_ratio, expected_df_ratio, rtol=1e-10) + + # Verify combined correction + combined_ratio = m_both.coef_stderr_ / m_none.coef_stderr_ + expected_combined_ratio = np.sqrt(n_groups / (n_groups - 1) * (n - 1) / (n - k_params)) + np.testing.assert_allclose(combined_ratio, expected_combined_ratio, rtol=1e-10) + + # Verify multiplicative property + both_from_components = m_group.coef_stderr_ * m_df.coef_stderr_ / m_none.coef_stderr_ + np.testing.assert_allclose(m_both.coef_stderr_, both_from_components, rtol=1e-10) + + def test_clustered_se_federated(self): + """Test federated calculation of clustered standard errors.""" + np.random.seed(42) + n = 300 + n_groups = 30 + X = np.random.randn(n, 3) + groups = np.repeat(np.arange(n_groups), n // n_groups) + group_effects = np.random.randn(n_groups) * 0.5 + y = 1 + 2 * X[:, 0] + 3 * X[:, 1] - X[:, 2] + group_effects[groups] + np.random.randn(n) * 0.5 + + # Baseline: fit on full data + baseline = StatsModelsLinearRegression( + cov_type='clustered', + enable_federation=False + ).fit(X, y, groups=groups) + + # Split data into chunks at group boundaries + n_chunks = 3 + groups_per_chunk = n_groups // n_chunks + models = [] + for i in range(n_chunks): + start_group = i * groups_per_chunk + end_group = (i + 1) * groups_per_chunk if i < n_chunks - 1 else n_groups + mask = (groups >= start_group) & (groups < end_group) + X_chunk = X[mask] + y_chunk = y[mask] + groups_chunk = groups[mask] + + model = StatsModelsLinearRegression( + cov_type='clustered', + enable_federation=True + ).fit(X_chunk, y_chunk, groups=groups_chunk) + models.append(model) + + # Aggregate models + aggregated = StatsModelsLinearRegression.aggregate(models) + + # Compare coefficients and standard errors + np.testing.assert_allclose(aggregated.coef_, baseline.coef_, rtol=1e-10) + np.testing.assert_allclose(aggregated.intercept_, baseline.intercept_, rtol=1e-10) + np.testing.assert_allclose(aggregated.coef_stderr_, baseline.coef_stderr_, rtol=1e-10) + np.testing.assert_allclose(aggregated.intercept_stderr_, baseline.intercept_stderr_, rtol=1e-10)