Skip to content

Commit

Permalink
Add RankBasedAnomalyScorer as an anomaly scorer
Browse files Browse the repository at this point in the history
This is very closely related to the RescaledMedianCDFQuantileScorer but a statistically more well-defined approach. However, it is also slightly more conservative for small sample sizes.
Furthermore, it changes the way equal samples are counted in MedianCDFQuantileScorer by also including the test sample itself. This prevents a p-value of 0.

Signed-off-by: Patrick Bloebaum <bloebp@amazon.com>
  • Loading branch information
bloebp committed May 28, 2024
1 parent ff068cc commit 1eff1ba
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 18 deletions.
89 changes: 80 additions & 9 deletions dowhy/gcm/anomaly_scorers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
class MedianCDFQuantileScorer(AnomalyScorer):
"""Given an anomalous observation x and samples from the distribution of X, this score represents:
score(x) = 1 - 2 * min[P(X > x) + P(X = x) / 2, P(X < x) + P(X = x) / 2]
Here, the value x is considered as part of X for the computation.
Comparing two NaN values are considered equal here.
Expand All @@ -26,14 +27,15 @@ class MedianCDFQuantileScorer(AnomalyScorer):
X = [-3, -2, -1, 0, 1, 2, 3]
x = 2.5
Then, x falls in the right sided-quantile and only one sample in X is larger than x. Therefore, we get
p(X >= x) = 1 / 7
P(X <= x) = 6 / 7
With the end score of:
1 - 2 * min[P(X > x) + P(X = x) / 2, P(X < x) + P(X = x) / 2] = 1 - 2 / 7 = 0.71
P(X > x) = 1 / 8
P(X < x) = 6 / 8
P(X = x) = 1 / 8
We divide by 8 here, because we consider x itself.
This gives us a score of:
1 - 2 * min[P(X > x) + P(X = x) / 2, P(X < x) + P(X = x) / 2] = 1 - 3 / 8 = 0.625
Note: For equal samples, we contribute half of the count to the left and half of the count the right side.
The higher the score, the less likely the sample comes from the distribution of X.
Note: For a statistically more rigorous, but also more conservative version, see RankBasedAnomalyScorer.
"""

def __init__(self):
Expand All @@ -51,12 +53,12 @@ def score(self, X: np.ndarray) -> np.ndarray:

X = shape_into_2d(X.astype(float))

equal_samples = np.sum(np.isclose(X, self._distribution_samples, rtol=0, atol=0, equal_nan=True), axis=1)
equal_samples = np.sum(np.isclose(X, self._distribution_samples, rtol=0, atol=0, equal_nan=True), axis=1) + 1
greater_samples = np.sum(X > self._distribution_samples, axis=1) + equal_samples / 2
smaller_samples = np.sum(X < self._distribution_samples, axis=1) + equal_samples / 2

return (
1 - 2 * np.amin(np.vstack([greater_samples, smaller_samples]), axis=0) / self._distribution_samples.shape[0]
return 1 - 2 * np.amin(np.vstack([greater_samples, smaller_samples]), axis=0) / (
self._distribution_samples.shape[0] + 1
)


Expand Down Expand Up @@ -88,6 +90,75 @@ def score(self, X: np.ndarray) -> np.ndarray:
return -np.log(scores)


class RankBasedAnomalyScorer(AnomalyScorer):
"""Similar to the RescaledMedianCDFQuantileScorer, but this scorer is more directly based on ranks and the
assumption of exchangeability.
This scorer computes anomaly scores for test samples by evaluating their ranks within the training samples (and a
given sample). For each test sample, the scorer computes its rank from above (number of samples greater than or
equal to it) and rank from below (number of samples less than or equal to it). It then calculates a p-value based
on these ranks, under the assumption of exchangeability. The p-value then represents the probability of observing
a rank as extreme as the observed rank or more extreme.
Specifically, the p-value is computed as the minimum of:
1. Twice the rank from above divided by the total number of samples.
2. Twice the rank from below divided by the total number of samples.
3. 1 (to ensure the p-value is at most 1).
This method is non-parametric and makes no assumptions about the underlying distribution of the data.
The anomaly score is then calculated as the negative log of this p-value (i.e. it is an information-theoretic
(IT) score). Higher anomaly scores indicate a lower probability, consequently, a higher likelihood of being an anomaly.
For example:
X = [-3, -2, -1, 0, 1, 2, 3]
x = 2.5
Then,
p(x >= X) = 7 / 8
P(x <= X) = 1 / 8
Which gives the p-value:
-log(min[1, 2 * 7 / 8, 2 * 1 / 8]) = -log(2 / 8) =
"""

def __init__(self):
self._distribution_samples = None

def fit(self, X: np.ndarray) -> None:
if (X.ndim == 2 and X.shape[1] > 1) or X.ndim > 2:
raise ValueError("The RankBasedAnomalyScorer currently only supports one-dimensional data!")

self._distribution_samples = X.reshape(-1)

def score(self, X: np.ndarray) -> np.ndarray:
if self._distribution_samples is None:
raise ValueError("Scorer has not been fitted!")

X = shape_into_2d(X)

# Compute rank of every single test point in the union of the training and the respective test point.
# + 1 here to count the test sample itself.
equal_samples = np.sum(np.isclose(X, self._distribution_samples, rtol=0, atol=0, equal_nan=True), axis=1) + 1
ranks_from_above = np.sum(X > self._distribution_samples, axis=1) + equal_samples
ranks_from_below = np.sum(X < self._distribution_samples, axis=1) + equal_samples

# The probability to get at most rank k from above is k divided by the total number of samples. Similar for
# the case of below. Therefore, to get at most rank k either from above or below is
# min(2*k/total_num_samples, 1). We then get a p-value for exchangeability:
p_values = np.amin(
np.vstack(
[
2 * ranks_from_above / (self._distribution_samples.shape[0] + 1),
2 * ranks_from_below / (self._distribution_samples.shape[0] + 1),
np.ones(X.shape[0]),
]
),
axis=0,
)

return -np.log(p_values)


class ITAnomalyScorer(AnomalyScorer):
"""Transforms any anomaly scorer into an information theoretic (IT) score. This means, given a scorer S(x), an
anomalous observation x and samples from the distribution of X, this scorer class represents:
Expand Down
2 changes: 1 addition & 1 deletion tests/gcm/test_anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_given_multivariate_inputs_when_estimate_anomaly_scores_then_does_not_ra

fit(causal_model, data)
scores = anomaly_scores(causal_model, data_anomaly)
assert scores["X1"][0] == approx(-np.log(EPS), abs=3)
assert scores["X1"][0] == approx(-np.log(1 / (data.shape[0] + 1)), abs=3)


@flaky(max_runs=3)
Expand Down
25 changes: 21 additions & 4 deletions tests/gcm/test_anomaly_scorers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from pytest import approx

from dowhy.gcm import MedianCDFQuantileScorer, MedianDeviationScorer, RescaledMedianCDFQuantileScorer
from dowhy.gcm.anomaly_scorers import RankBasedAnomalyScorer


def test_given_simple_toy_data_when_using_MedianCDFQuantileScorer_then_returns_expected_scores():
anomaly_scorer = MedianCDFQuantileScorer()
anomaly_scorer.fit(np.array(range(0, 20)))
assert anomaly_scorer.score(np.array([8, 17]))[0] == approx(1 - 17 / 20, abs=0.01)
assert anomaly_scorer.score(np.array([0])) == approx(1 - 1 / 20, abs=0.01)
assert anomaly_scorer.score(np.array([8, 17]))[0] == approx(1 - 18 / 21, abs=0.01)
assert anomaly_scorer.score(np.array([0])) == approx(1 - 2 / 21, abs=0.01)

anomaly_scorer.fit(np.array([4.0 for i in range(200)]))
assert anomaly_scorer.score(np.array([4])) == approx(0, abs=0.01)
Expand All @@ -27,7 +28,7 @@ def test_given_data_with_nans_when_using_median_quantile_scorer_with_nan_support
scorer.fit(training_data)

assert scorer.score(np.array([1, 4, 8, np.nan])) == approx(
[-np.log(2 * 0.5 / 10), -np.log(2 * 3.5 / 10), -np.log(2 * 0.5 / 10), -np.log(2 * 1 / 10)]
[-np.log(2 * 1 / 11), -np.log(2 * 4 / 11), -np.log(2 * 1 / 11), -np.log(2 * 1.5 / 11)]
)


Expand All @@ -38,5 +39,21 @@ def test_given_numpy_arrays_with_object_type_when_using_median_quantile_scorer_t
scorer.fit(training_data)

assert scorer.score(np.array([1, 4, 8, np.nan], dtype=object)) == approx(
[-np.log(2 * 0.5 / 10), -np.log(2 * 3.5 / 10), -np.log(2 * 0.5 / 10), -np.log(2 * 1 / 10)]
[-np.log(2 * 1 / 11), -np.log(2 * 4 / 11), -np.log(2 * 1 / 11), -np.log(2 * 1.5 / 11)]
)


def test_when_using_ranked_based_anomaly_scorer_then_returns_expected_scores():
training_data = np.array([1, 2, 3, 4, 5, 6, 7, 8, np.nan, np.nan])

scorer = RankBasedAnomalyScorer()
scorer.fit(training_data)

assert scorer.score(np.array([1, 4, 9, np.nan])) == approx(
[-np.log(2 * 2 / 11), -np.log(2 * 5 / 11), -np.log(2 * 1 / 11), -np.log(2 * 3 / 11)]
)

training_data = np.array([np.nan, np.nan, np.nan, np.nan])
scorer.fit(training_data)

assert scorer.score(np.array([1, np.nan])) == approx([-np.log(2 * 1 / 5), -np.log(1)])
5 changes: 1 addition & 4 deletions tests/gcm/test_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,8 @@ def test_given_categorical_input_and_output_features_when_fit_classification_mod
mdl = create_logistic_regression_classifier()
mdl.fit(inputs, X2)

X2["True" == 1] = 1
X2["False" == 0] = 0

assert mdl.predict_probabilities(np.array([[2, "1"]], dtype=object)) == approx(np.array([[0, 1]]), abs=0.01)
assert np.sum(np.argmax(mdl.predict_probabilities(inputs), axis=1) != X2) < 20
assert np.sum(mdl.predict(inputs).reshape(-1) != X2) < 20

_, counts = np.unique(mdl.predict(inputs), return_counts=True)
assert counts / 1000 == approx(np.array([0.5, 0.5]), abs=0.05)
Expand Down

0 comments on commit 1eff1ba

Please sign in to comment.