Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 0 additions & 115 deletions tests/test_adjusted_estimator.py

This file was deleted.

106 changes: 105 additions & 1 deletion tests/test_local_estimators.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,72 @@
import unittest
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import LogisticRegression, LinearRegression
from dte_adj import SimpleLocalDistributionEstimator, AdjustedLocalDistributionEstimator

np.random.seed(123)


def generate_data(n=1000, S=4):
# Generate W ~ U(0,1)
W = np.random.uniform(0, 1, n)

# Assign strata based on W
strata = np.digitize(W, np.linspace(0, 1, S + 1)[1:])

# Generate X ~ N(0, I_20)
X = np.random.randn(n, 20)

# Treatment assignment Z ~ Bernoulli(0.5) within each stratum
Z = np.zeros(n)
for s in range(S):
indices = np.where(strata == s)[0]
Z[indices] = np.random.binomial(1, 0.5, size=len(indices))

# Define functions b(X, W) and c(X, W)
def b(X, W):
return (
np.sin(np.pi * X[:, 0] * X[:, 1])
+ 2 * (X[:, 2] - 0.5) ** 2
+ X[:, 3]
+ 0.5 * X[:, 4]
+ 0.1 * W
)

def c(X, W):
return 0.1 * (X[:, 0] + np.log(1 + np.exp(X[:, 1])) + W)

# Define parameters
a1, a0 = 4, 1
b1, b0 = 1, -1
c1, c0 = 3, 3

# Generate errors
epsilon = np.random.randn(n)

# Compute Y(d)
Y0 = a0 + b(X, W) + epsilon
Y1 = a1 + b(X, W) + epsilon

# Compute D(0) and D(1)
D0 = (b0 + c(X, W) > c0 * epsilon).astype(int)
D1 = np.where(D0 == 0, (b1 + c(X, W) > c1 * epsilon).astype(int), 1)

# Compute observed D and Y
D = D1 * Z + D0 * (1 - Z)
Y = Y1 * D + Y0 * (1 - D)

# discrete
Y = np.random.poisson(np.abs(Y))

return {
"W": W,
"X": X,
"Z": Z,
"D": D,
"Y": Y,
"strata": strata,
}


class TestLocalEstimators(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -232,3 +296,43 @@ def test_adjusted_local_estimator_predict_lpte(self):
self.assertTrue(np.all(lower_bound <= upper_bound))
self.assertTrue(np.all(lower_bound <= beta))
self.assertTrue(np.all(beta <= upper_bound))


class TestE2E(unittest.TestCase):
def test_e2e(self):
# Arrange
data = generate_data(n=3000)
X, D, Y, Z, S = data["X"], data["W"], data["Y"], data["Z"], data["strata"]
locations = np.array([np.percentile(Y, p) for p in range(10, 91, 10)])
simple_estimator = SimpleLocalDistributionEstimator()
adjusted_estimator = AdjustedLocalDistributionEstimator(LinearRegression())

# Act
simple_estimator.fit(X, Z, D, Y, S)
adjusted_estimator.fit(X, Z, D, Y, S)

simple_dte, simple_lower_bound, simple_upper_bound = (
simple_estimator.predict_dte(1, 0, locations)
)
adjusted_dte, adjusted_lower_bound, adjusted_upper_bound = (
adjusted_estimator.predict_dte(1, 0, locations)
)

# Assert
np.testing.assert_(np.all(simple_dte < 0), "Not all values are negative")
np.testing.assert_(np.all(adjusted_dte < 0), "Not all values are negative")
np.testing.assert_(
np.all(simple_lower_bound < simple_upper_bound),
"Upper bound is less than lower bound",
)
np.testing.assert_(
np.all(adjusted_lower_bound < adjusted_upper_bound),
"Upper bound is less than lower bound",
)
np.testing.assert_(
np.all(
adjusted_upper_bound - adjusted_lower_bound
< simple_upper_bound - simple_lower_bound
),
"Adjusted estimator does not have narrower intervals",
)
Loading
Loading