From c43869d54fe247aee6c7c63b580ebd9e7f795a76 Mon Sep 17 00:00:00 2001 From: TomeHirata Date: Wed, 24 Sep 2025 20:49:27 +0900 Subject: [PATCH 1/2] add e2e for simple estimators --- tests/test_adjusted_estimator.py | 115 ------------------ tests/test_simple_estimator.py | 192 ++++++++++++++++++++++++++++++- 2 files changed, 191 insertions(+), 116 deletions(-) delete mode 100644 tests/test_adjusted_estimator.py diff --git a/tests/test_adjusted_estimator.py b/tests/test_adjusted_estimator.py deleted file mode 100644 index 31daa0e..0000000 --- a/tests/test_adjusted_estimator.py +++ /dev/null @@ -1,115 +0,0 @@ -import unittest -from unittest.mock import patch -import numpy as np -from dte_adj import AdjustedDistributionEstimator -from unittest.mock import MagicMock - - -class TestAdjustedEstimator(unittest.TestCase): - def setUp(self): - base_model = MagicMock() - base_model.predict_proba.side_effect = lambda x, y: x - self.estimator = AdjustedDistributionEstimator(base_model, folds=2) - self.covariates = np.zeros((20, 5)) - self.treatment_arms = np.hstack([np.zeros(10), np.ones(10)]) - self.outcomes = np.arange(20) - self.estimator.fit(self.covariates, self.treatment_arms, self.outcomes) - - def test_init_fail_incorrect_base_model(self): - # Act, Assert - with self.assertRaises(ValueError) as cm: - AdjustedDistributionEstimator("dummy") - self.assertEqual( - str(cm.exception), - "Base model should implement either predict_proba or predict", - ) - - def test_predict_fail_before_fit(self): - # Arrange - D = np.zeros(20) - D[:10] = 1 - Y = np.arange(20) - base_model = MagicMock() - subject = AdjustedDistributionEstimator(base_model) - - # Act, Assert - with self.assertRaises(ValueError) as cm: - subject.predict(D, Y) - self.assertEqual( - str(cm.exception), - "This estimator has not been trained yet. Please call fit first", - ) - - def test_fit_fail_invalid_input(self): - # Arrange - X = np.arange(20) - D = np.zeros(10) - D[:10] = 1 - Y = np.arange(20) - base_model = MagicMock() - subject = AdjustedDistributionEstimator(base_model) - - # Act, Assert - with self.assertRaises(ValueError) as cm: - subject.fit(X, D, Y) - self.assertEqual( - str(cm.exception), - "The shape of covariates and treatment_arm should be same", - ) - - def test_compute_cumulative_distribution(self): - # Arrange - mock_model = self.estimator.base_model - mock_model.predict_proba.side_effect = lambda x: np.ones((len(x), 2)) * 0.5 - target_treatment_arm = 0 - locations = np.arange(10) - - # Act - with patch( - "numpy.random.randint", - return_value=np.array([0] * 5 + [1] * 5 + [0] * 5 + [1] * 5), - ): - cumulative_distribution, _, superset_prediction = ( - self.estimator._compute_cumulative_distribution( - target_treatment_arm, - locations, - self.covariates, - self.treatment_arms, - self.outcomes, - ) - ) - - # Assert - self.assertEqual(cumulative_distribution.shape, (10,)) - self.assertEqual(superset_prediction.shape, (20, 10)) - - for i in range(10): - self.assertAlmostEqual(cumulative_distribution[i], (i + 1) / 10, places=2) - - expected_result = np.array( - [ - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], - ] - ) - np.testing.assert_array_almost_equal( - superset_prediction, expected_result, decimal=2 - ) diff --git a/tests/test_simple_estimator.py b/tests/test_simple_estimator.py index 80e4928..e801d04 100644 --- a/tests/test_simple_estimator.py +++ b/tests/test_simple_estimator.py @@ -1,6 +1,47 @@ import unittest import numpy as np -from dte_adj import SimpleDistributionEstimator +from unittest.mock import patch, MagicMock +from sklearn.linear_model import LogisticRegression +from dte_adj import SimpleDistributionEstimator, AdjustedDistributionEstimator + + +def generate_data(n, d_x=100, rho=0.5): + """ + Generate data according to the described data generating process (DGP). + + Args: + n (int): Number of samples. + d_x (int): Number of covariates. Default is 100. + rho (float): Success probability for the Bernoulli distribution. Default is 0.5. + + Returns: + X (np.ndarray): Covariates matrix of shape (n, d_x). + D (np.ndarray): Treatment variable array of shape (n,). + Y (np.ndarray): Outcome variable array of shape (n,). + """ + # Generate covariates X from a uniform distribution on (0, 1) + X = np.random.uniform(0, 1, (n, d_x)) + + # Generate treatment variable D from a Bernoulli distribution with success probability rho + D = np.random.binomial(1, rho, n) + + # Define beta_j and gamma_j according to the problem statement + beta = np.zeros(d_x) + gamma = np.zeros(d_x) + + # Set the first 50 values of beta and gamma to 1 + beta[:50] = 1 + gamma[:50] = 1 + + # Compute the outcome Y + U = np.random.normal(0, 1, n) # Error term + linear_term = np.dot(X, beta) + quadratic_term = np.dot(X**2, gamma) + + # Outcome equation + Y = 5 * D + linear_term + quadratic_term + U + + return X, D, Y class TestSimpleEstimator(unittest.TestCase): @@ -38,3 +79,152 @@ def test_fit_invalid_shapes(self): with self.assertRaises(ValueError): self.estimator.fit(self.covariates, self.treatment_arms, outcomes_invalid) + + +class TestAdjustedEstimator(unittest.TestCase): + def setUp(self): + base_model = MagicMock() + base_model.predict_proba.side_effect = lambda x, y: x + self.estimator = AdjustedDistributionEstimator(base_model, folds=2) + self.covariates = np.zeros((20, 5)) + self.treatment_arms = np.hstack([np.zeros(10), np.ones(10)]) + self.outcomes = np.arange(20) + self.estimator.fit(self.covariates, self.treatment_arms, self.outcomes) + + def test_init_fail_incorrect_base_model(self): + # Act, Assert + with self.assertRaises(ValueError) as cm: + AdjustedDistributionEstimator("dummy") + self.assertEqual( + str(cm.exception), + "Base model should implement either predict_proba or predict", + ) + + def test_predict_fail_before_fit(self): + # Arrange + D = np.zeros(20) + D[:10] = 1 + Y = np.arange(20) + base_model = MagicMock() + subject = AdjustedDistributionEstimator(base_model) + + # Act, Assert + with self.assertRaises(ValueError) as cm: + subject.predict(D, Y) + self.assertEqual( + str(cm.exception), + "This estimator has not been trained yet. Please call fit first", + ) + + def test_fit_fail_invalid_input(self): + # Arrange + X = np.arange(20) + D = np.zeros(10) + D[:10] = 1 + Y = np.arange(20) + base_model = MagicMock() + subject = AdjustedDistributionEstimator(base_model) + + # Act, Assert + with self.assertRaises(ValueError) as cm: + subject.fit(X, D, Y) + self.assertEqual( + str(cm.exception), + "The shape of covariates and treatment_arm should be same", + ) + + def test_compute_cumulative_distribution(self): + # Arrange + mock_model = self.estimator.base_model + mock_model.predict_proba.side_effect = lambda x: np.ones((len(x), 2)) * 0.5 + target_treatment_arm = 0 + locations = np.arange(10) + + # Act + with patch( + "numpy.random.randint", + return_value=np.array([0] * 5 + [1] * 5 + [0] * 5 + [1] * 5), + ): + cumulative_distribution, _, superset_prediction = ( + self.estimator._compute_cumulative_distribution( + target_treatment_arm, + locations, + self.covariates, + self.treatment_arms, + self.outcomes, + ) + ) + + # Assert + self.assertEqual(cumulative_distribution.shape, (10,)) + self.assertEqual(superset_prediction.shape, (20, 10)) + + for i in range(10): + self.assertAlmostEqual(cumulative_distribution[i], (i + 1) / 10, places=2) + + expected_result = np.array( + [ + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + [0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + ] + ) + np.testing.assert_array_almost_equal( + superset_prediction, expected_result, decimal=2 + ) + + +class TestE2E(unittest.TestCase): + def test_e2e(self): + # Arrange + X, D, Y = generate_data(n=1000) + locations = np.array([np.percentile(Y, p) for p in range(10, 91, 10)]) + simple_estimator = SimpleDistributionEstimator() + adjusted_estimator = AdjustedDistributionEstimator(LogisticRegression()) + + # Act + simple_estimator.fit(X, D, Y) + adjusted_estimator.fit(X, D, Y) + + simple_dte, simple_lower_bound, simple_upper_bound = ( + simple_estimator.predict_dte(1, 0, locations) + ) + adjusted_dte, adjusted_lower_bound, adjusted_upper_bound = ( + adjusted_estimator.predict_dte(1, 0, locations) + ) + + # Assert + np.testing.assert_(np.all(simple_dte < 0), "Not all values are negative") + np.testing.assert_(np.all(adjusted_dte < 0), "Not all values are negative") + np.testing.assert_( + np.all(simple_lower_bound < simple_upper_bound), + "Upper bound is less than lower bound", + ) + np.testing.assert_( + np.all(adjusted_lower_bound < adjusted_upper_bound), + "Upper bound is less than lower bound", + ) + np.testing.assert_( + np.all( + adjusted_upper_bound - adjusted_lower_bound + < simple_upper_bound - simple_lower_bound + ), + "Adjusted estimator does not have narrower intervals", + ) From cc0eb775b3a594a74c89c4e6adacfdfd9a513685 Mon Sep 17 00:00:00 2001 From: TomeHirata Date: Wed, 24 Sep 2025 23:14:13 +0900 Subject: [PATCH 2/2] Add e2e tests --- tests/test_local_estimators.py | 106 +++++++++++++- tests/test_simple_estimator.py | 2 + tests/test_stratified_estimators.py | 216 ++++++++++++++++++++++++++++ 3 files changed, 323 insertions(+), 1 deletion(-) create mode 100644 tests/test_stratified_estimators.py diff --git a/tests/test_local_estimators.py b/tests/test_local_estimators.py index 164788c..58c7233 100644 --- a/tests/test_local_estimators.py +++ b/tests/test_local_estimators.py @@ -1,8 +1,72 @@ import unittest import numpy as np -from sklearn.linear_model import LogisticRegression +from sklearn.linear_model import LogisticRegression, LinearRegression from dte_adj import SimpleLocalDistributionEstimator, AdjustedLocalDistributionEstimator +np.random.seed(123) + + +def generate_data(n=1000, S=4): + # Generate W ~ U(0,1) + W = np.random.uniform(0, 1, n) + + # Assign strata based on W + strata = np.digitize(W, np.linspace(0, 1, S + 1)[1:]) + + # Generate X ~ N(0, I_20) + X = np.random.randn(n, 20) + + # Treatment assignment Z ~ Bernoulli(0.5) within each stratum + Z = np.zeros(n) + for s in range(S): + indices = np.where(strata == s)[0] + Z[indices] = np.random.binomial(1, 0.5, size=len(indices)) + + # Define functions b(X, W) and c(X, W) + def b(X, W): + return ( + np.sin(np.pi * X[:, 0] * X[:, 1]) + + 2 * (X[:, 2] - 0.5) ** 2 + + X[:, 3] + + 0.5 * X[:, 4] + + 0.1 * W + ) + + def c(X, W): + return 0.1 * (X[:, 0] + np.log(1 + np.exp(X[:, 1])) + W) + + # Define parameters + a1, a0 = 4, 1 + b1, b0 = 1, -1 + c1, c0 = 3, 3 + + # Generate errors + epsilon = np.random.randn(n) + + # Compute Y(d) + Y0 = a0 + b(X, W) + epsilon + Y1 = a1 + b(X, W) + epsilon + + # Compute D(0) and D(1) + D0 = (b0 + c(X, W) > c0 * epsilon).astype(int) + D1 = np.where(D0 == 0, (b1 + c(X, W) > c1 * epsilon).astype(int), 1) + + # Compute observed D and Y + D = D1 * Z + D0 * (1 - Z) + Y = Y1 * D + Y0 * (1 - D) + + # discrete + Y = np.random.poisson(np.abs(Y)) + + return { + "W": W, + "X": X, + "Z": Z, + "D": D, + "Y": Y, + "strata": strata, + } + class TestLocalEstimators(unittest.TestCase): def setUp(self): @@ -232,3 +296,43 @@ def test_adjusted_local_estimator_predict_lpte(self): self.assertTrue(np.all(lower_bound <= upper_bound)) self.assertTrue(np.all(lower_bound <= beta)) self.assertTrue(np.all(beta <= upper_bound)) + + +class TestE2E(unittest.TestCase): + def test_e2e(self): + # Arrange + data = generate_data(n=3000) + X, D, Y, Z, S = data["X"], data["W"], data["Y"], data["Z"], data["strata"] + locations = np.array([np.percentile(Y, p) for p in range(10, 91, 10)]) + simple_estimator = SimpleLocalDistributionEstimator() + adjusted_estimator = AdjustedLocalDistributionEstimator(LinearRegression()) + + # Act + simple_estimator.fit(X, Z, D, Y, S) + adjusted_estimator.fit(X, Z, D, Y, S) + + simple_dte, simple_lower_bound, simple_upper_bound = ( + simple_estimator.predict_dte(1, 0, locations) + ) + adjusted_dte, adjusted_lower_bound, adjusted_upper_bound = ( + adjusted_estimator.predict_dte(1, 0, locations) + ) + + # Assert + np.testing.assert_(np.all(simple_dte < 0), "Not all values are negative") + np.testing.assert_(np.all(adjusted_dte < 0), "Not all values are negative") + np.testing.assert_( + np.all(simple_lower_bound < simple_upper_bound), + "Upper bound is less than lower bound", + ) + np.testing.assert_( + np.all(adjusted_lower_bound < adjusted_upper_bound), + "Upper bound is less than lower bound", + ) + np.testing.assert_( + np.all( + adjusted_upper_bound - adjusted_lower_bound + < simple_upper_bound - simple_lower_bound + ), + "Adjusted estimator does not have narrower intervals", + ) diff --git a/tests/test_simple_estimator.py b/tests/test_simple_estimator.py index e801d04..928ef3e 100644 --- a/tests/test_simple_estimator.py +++ b/tests/test_simple_estimator.py @@ -4,6 +4,8 @@ from sklearn.linear_model import LogisticRegression from dte_adj import SimpleDistributionEstimator, AdjustedDistributionEstimator +np.random.seed(123) + def generate_data(n, d_x=100, rho=0.5): """ diff --git a/tests/test_stratified_estimators.py b/tests/test_stratified_estimators.py new file mode 100644 index 0000000..080b349 --- /dev/null +++ b/tests/test_stratified_estimators.py @@ -0,0 +1,216 @@ +import unittest +import numpy as np +from sklearn.linear_model import LogisticRegression +from dte_adj import ( + SimpleStratifiedDistributionEstimator, + AdjustedStratifiedDistributionEstimator, +) + + +def generate_data(n=1000, S=4, d=2, discrete=False): + d = 20 + + Z = np.random.uniform(0, 1, n) + + S_i = np.digitize(Z, np.linspace(0, 1, S + 1)[1:-1]) + + X = np.random.multivariate_normal(mean=np.zeros(d), cov=np.eye(d), size=n) + + W = np.zeros(n, dtype=int) + unique_strata = np.unique(S_i) + for s in unique_strata: + idx = np.where(S_i == s)[0] + n_s = len(idx) + W[idx[: n_s // 2]] = 1 + np.random.shuffle(W[idx]) + + b_X = ( + np.sin(np.pi * X[:, 0] * X[:, 1]) + + 2 * (X[:, 2] - 0.5) ** 2 + + X[:, 3] + + 0.5 * X[:, 4] + ) + c_X = 0.1 * (X[:, 0] + np.log(1 + np.exp(X[:, 1]))) + + gamma = 0.1 + u = np.random.normal(0, 1, n) + + Y = b_X + c_X * W + gamma * Z + u + if discrete: + Y = np.random.poisson(0.2 * np.abs(Y)) + + return {"W": W, "X": X, "Z": Z, "Y": Y, "strata": S_i} + + +class TestStratifiedEstimators(unittest.TestCase): + def setUp(self): + np.random.seed(42) + data = generate_data(n=1000, S=4, d=20, discrete=False) + self.X = data["X"] + self.W = data["W"] + self.Y = data["Y"] + self.strata = data["strata"] + self.locations = np.linspace(self.Y.min(), self.Y.max(), 20) + + def test_simple_stratified_estimator_fit(self): + estimator = SimpleStratifiedDistributionEstimator() + result = estimator.fit(self.X, self.W, self.Y, self.strata) + + self.assertIsInstance(result, SimpleStratifiedDistributionEstimator) + self.assertTrue(np.array_equal(estimator.covariates, self.X)) + self.assertTrue(np.array_equal(estimator.treatment_arms, self.W)) + self.assertTrue(np.array_equal(estimator.outcomes, self.Y)) + self.assertTrue(np.array_equal(estimator.strata, self.strata)) + + def test_simple_stratified_estimator_predict_dte(self): + estimator = SimpleStratifiedDistributionEstimator() + estimator.fit(self.X, self.W, self.Y, self.strata) + + dte, lower_bound, upper_bound = estimator.predict_dte( + target_treatment_arm=1, + control_treatment_arm=0, + locations=self.locations, + alpha=0.05, + ) + + self.assertEqual(dte.shape, self.locations.shape) + self.assertEqual(lower_bound.shape, self.locations.shape) + self.assertEqual(upper_bound.shape, self.locations.shape) + self.assertTrue(np.all(lower_bound <= dte)) + self.assertTrue(np.all(dte <= upper_bound)) + + def test_simple_stratified_estimator_predict_pte(self): + estimator = SimpleStratifiedDistributionEstimator() + estimator.fit(self.X, self.W, self.Y, self.strata) + + pte, lower_bound, upper_bound = estimator.predict_pte( + target_treatment_arm=1, + control_treatment_arm=0, + locations=self.locations, + alpha=0.05, + ) + + expected_length = len(self.locations) - 1 + self.assertEqual(pte.shape, (expected_length,)) + self.assertEqual(lower_bound.shape, (expected_length,)) + self.assertEqual(upper_bound.shape, (expected_length,)) + self.assertTrue(np.all(lower_bound <= upper_bound)) + + def test_simple_stratified_estimator_predict_qte(self): + estimator = SimpleStratifiedDistributionEstimator() + estimator.fit(self.X, self.W, self.Y, self.strata) + + quantiles = np.array([0.25, 0.5, 0.75]) + qte, lower_bound, upper_bound = estimator.predict_qte( + target_treatment_arm=1, + control_treatment_arm=0, + quantiles=quantiles, + n_bootstrap=50, + ) + + self.assertEqual(qte.shape, quantiles.shape) + self.assertEqual(lower_bound.shape, quantiles.shape) + self.assertEqual(upper_bound.shape, quantiles.shape) + self.assertTrue(np.all(lower_bound <= upper_bound)) + + def test_adjusted_stratified_estimator_fit(self): + base_model = LogisticRegression(max_iter=1000, random_state=42) + estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3) + result = estimator.fit(self.X, self.W, self.Y, self.strata) + + self.assertIsInstance(result, AdjustedStratifiedDistributionEstimator) + self.assertTrue(np.array_equal(estimator.covariates, self.X)) + self.assertTrue(np.array_equal(estimator.treatment_arms, self.W)) + self.assertTrue(np.array_equal(estimator.outcomes, self.Y)) + self.assertTrue(np.array_equal(estimator.strata, self.strata)) + self.assertEqual(estimator.folds, 3) + + def test_adjusted_stratified_estimator_predict_dte(self): + base_model = LogisticRegression(max_iter=1000, random_state=42) + estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3) + estimator.fit(self.X, self.W, self.Y, self.strata) + + dte, lower_bound, upper_bound = estimator.predict_dte( + target_treatment_arm=1, + control_treatment_arm=0, + locations=self.locations, + alpha=0.05, + variance_type="moment", + ) + + self.assertEqual(dte.shape, self.locations.shape) + self.assertEqual(lower_bound.shape, self.locations.shape) + self.assertEqual(upper_bound.shape, self.locations.shape) + self.assertTrue(np.all(lower_bound <= dte)) + self.assertTrue(np.all(dte <= upper_bound)) + + def test_adjusted_stratified_estimator_predict_pte(self): + base_model = LogisticRegression(max_iter=1000, random_state=42) + estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3) + estimator.fit(self.X, self.W, self.Y, self.strata) + + pte, lower_bound, upper_bound = estimator.predict_pte( + target_treatment_arm=1, + control_treatment_arm=0, + locations=self.locations, + alpha=0.05, + variance_type="moment", + ) + + expected_length = len(self.locations) - 1 + self.assertEqual(pte.shape, (expected_length,)) + self.assertEqual(lower_bound.shape, (expected_length,)) + self.assertEqual(upper_bound.shape, (expected_length,)) + self.assertTrue(np.all(lower_bound <= upper_bound)) + + def test_adjusted_stratified_estimator_predict_qte(self): + base_model = LogisticRegression(max_iter=1000, random_state=42) + estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3) + estimator.fit(self.X, self.W, self.Y, self.strata) + + quantiles = np.array([0.25, 0.5, 0.75]) + qte, lower_bound, upper_bound = estimator.predict_qte( + target_treatment_arm=1, + control_treatment_arm=0, + quantiles=quantiles, + n_bootstrap=50, + ) + + self.assertEqual(qte.shape, quantiles.shape) + self.assertEqual(lower_bound.shape, quantiles.shape) + self.assertEqual(upper_bound.shape, quantiles.shape) + self.assertTrue(np.all(lower_bound <= upper_bound)) + + def test_discrete_outcomes(self): + data = generate_data(n=1000, S=4, d=20, discrete=True) + + estimator = SimpleStratifiedDistributionEstimator() + estimator.fit(data["X"], data["W"], data["Y"], data["strata"]) + + locations = np.arange(0, data["Y"].max() + 1) + dte, lower, upper = estimator.predict_dte(1, 0, locations) + + self.assertEqual(dte.shape, locations.shape) + self.assertTrue(np.all(lower <= upper)) + + def test_invalid_input_shapes(self): + estimator = SimpleStratifiedDistributionEstimator() + + X_wrong = self.X[:-10] + + with self.assertRaises(ValueError): + estimator.fit(X_wrong, self.W, self.Y, self.strata) + + def test_different_alpha_values(self): + estimator = SimpleStratifiedDistributionEstimator() + estimator.fit(self.X, self.W, self.Y, self.strata) + + locations = self.locations[:10] + + _, lower_005, upper_005 = estimator.predict_dte(1, 0, locations, alpha=0.05) + _, lower_010, upper_010 = estimator.predict_dte(1, 0, locations, alpha=0.10) + + width_005 = upper_005 - lower_005 + width_010 = upper_010 - lower_010 + + self.assertTrue(np.all(width_010 < width_005))