From 739cec9d6d36e80868841310cf9878631295cc96 Mon Sep 17 00:00:00 2001 From: Patrick Bloebaum Date: Tue, 28 May 2024 09:31:51 -0700 Subject: [PATCH] Add RankBasedAnomalyScorer as an anomaly scorer This is very closely related to the RescaledMedianCDFQuantileScorer but a statistically more well-defined approach. However, it is also slightly more conservative for small sample sizes. Furthermore, it changes the way equal samples are counted in MedianCDFQuantileScorer by also including the test sample itself. This prevents a p-value of 0. --- dowhy/gcm/anomaly_scorers.py | 89 +++++++++++++++++++++++++++---- tests/gcm/test_anomaly_scorers.py | 25 +++++++-- 2 files changed, 101 insertions(+), 13 deletions(-) diff --git a/dowhy/gcm/anomaly_scorers.py b/dowhy/gcm/anomaly_scorers.py index 9a4735fa02..6fd800c8d1 100644 --- a/dowhy/gcm/anomaly_scorers.py +++ b/dowhy/gcm/anomaly_scorers.py @@ -14,6 +14,7 @@ class MedianCDFQuantileScorer(AnomalyScorer): """Given an anomalous observation x and samples from the distribution of X, this score represents: score(x) = 1 - 2 * min[P(X > x) + P(X = x) / 2, P(X < x) + P(X = x) / 2] + Here, the value x is considered as part of X for the computation. Comparing two NaN values are considered equal here. @@ -26,14 +27,15 @@ class MedianCDFQuantileScorer(AnomalyScorer): X = [-3, -2, -1, 0, 1, 2, 3] x = 2.5 Then, x falls in the right sided-quantile and only one sample in X is larger than x. Therefore, we get - p(X >= x) = 1 / 7 - P(X <= x) = 6 / 7 - With the end score of: - 1 - 2 * min[P(X > x) + P(X = x) / 2, P(X < x) + P(X = x) / 2] = 1 - 2 / 7 = 0.71 + P(X > x) = 1 / 8 + P(X < x) = 6 / 8 + P(X = x) = 1 / 8 + We divide by 8 here, because we consider x itself. + This gives us a score of: + 1 - 2 * min[P(X > x) + P(X = x) / 2, P(X < x) + P(X = x) / 2] = 1 - 3 / 8 = 0.625 Note: For equal samples, we contribute half of the count to the left and half of the count the right side. - - The higher the score, the less likely the sample comes from the distribution of X. + Note: For a statistically more rigorous, but also more conservative version, see RankBasedAnomalyScorer. """ def __init__(self): @@ -51,12 +53,12 @@ def score(self, X: np.ndarray) -> np.ndarray: X = shape_into_2d(X.astype(float)) - equal_samples = np.sum(np.isclose(X, self._distribution_samples, rtol=0, atol=0, equal_nan=True), axis=1) + equal_samples = np.sum(np.isclose(X, self._distribution_samples, rtol=0, atol=0, equal_nan=True), axis=1) + 1 greater_samples = np.sum(X > self._distribution_samples, axis=1) + equal_samples / 2 smaller_samples = np.sum(X < self._distribution_samples, axis=1) + equal_samples / 2 - return ( - 1 - 2 * np.amin(np.vstack([greater_samples, smaller_samples]), axis=0) / self._distribution_samples.shape[0] + return 1 - 2 * np.amin(np.vstack([greater_samples, smaller_samples]), axis=0) / ( + self._distribution_samples.shape[0] + 1 ) @@ -88,6 +90,75 @@ def score(self, X: np.ndarray) -> np.ndarray: return -np.log(scores) +class RankBasedAnomalyScorer(AnomalyScorer): + """Similar to the RescaledMedianCDFQuantileScorer, but this scorer is more directly based on ranks and the + assumption of exchangeability. + + This scorer computes anomaly scores for test samples by evaluating their ranks within the training samples (and a + given sample). For each test sample, the scorer computes its rank from above (number of samples greater than or + equal to it) and rank from below (number of samples less than or equal to it). It then calculates a p-value based + on these ranks, under the assumption of exchangeability. The p-value then represents the probability of observing + a rank as extreme as the observed rank or more extreme. + + Specifically, the p-value is computed as the minimum of: + 1. Twice the rank from above divided by the total number of samples. + 2. Twice the rank from below divided by the total number of samples. + 3. 1 (to ensure the p-value is at most 1). + + This method is non-parametric and makes no assumptions about the underlying distribution of the data. + + The anomaly score is then calculated as the negative log of this p-value (i.e. it is an information-theoretic + (IT) score). Higher anomaly scores indicate a lower probability, consequently, a higher likelihood of being an anomaly. + + For example: + X = [-3, -2, -1, 0, 1, 2, 3] + x = 2.5 + Then, + p(x >= X) = 7 / 8 + P(x <= X) = 1 / 8 + Which gives the p-value: + -log(min[1, 2 * 7 / 8, 2 * 1 / 8]) = -log(2 / 8) = + + """ + + def __init__(self): + self._distribution_samples = None + + def fit(self, X: np.ndarray) -> None: + if (X.ndim == 2 and X.shape[1] > 1) or X.ndim > 2: + raise ValueError("The RankBasedAnomalyScorer currently only supports one-dimensional data!") + + self._distribution_samples = X.reshape(-1) + + def score(self, X: np.ndarray) -> np.ndarray: + if self._distribution_samples is None: + raise ValueError("Scorer has not been fitted!") + + X = shape_into_2d(X) + + # Compute rank of every single test point in the union of the training and the respective test point. + # + 1 here to count the test sample itself. + equal_samples = np.sum(np.isclose(X, self._distribution_samples, rtol=0, atol=0, equal_nan=True), axis=1) + 1 + ranks_from_above = np.sum(X > self._distribution_samples, axis=1) + equal_samples + ranks_from_below = np.sum(X < self._distribution_samples, axis=1) + equal_samples + + # The probability to get at most rank k from above is k divided by the total number of samples. Similar for + # the case of below. Therefore, to get at most rank k either from above or below is + # min(2*k/total_num_samples, 1). We then get a p-value for exchangeability: + p_values = np.amin( + np.vstack( + [ + 2 * ranks_from_above / (self._distribution_samples.shape[0] + 1), + 2 * ranks_from_below / (self._distribution_samples.shape[0] + 1), + np.ones(X.shape[0]), + ] + ), + axis=0, + ) + + return -np.log(p_values) + + class ITAnomalyScorer(AnomalyScorer): """Transforms any anomaly scorer into an information theoretic (IT) score. This means, given a scorer S(x), an anomalous observation x and samples from the distribution of X, this scorer class represents: diff --git a/tests/gcm/test_anomaly_scorers.py b/tests/gcm/test_anomaly_scorers.py index e28fc42c90..718cf14380 100644 --- a/tests/gcm/test_anomaly_scorers.py +++ b/tests/gcm/test_anomaly_scorers.py @@ -2,13 +2,14 @@ from pytest import approx from dowhy.gcm import MedianCDFQuantileScorer, MedianDeviationScorer, RescaledMedianCDFQuantileScorer +from dowhy.gcm.anomaly_scorers import RankBasedAnomalyScorer def test_given_simple_toy_data_when_using_MedianCDFQuantileScorer_then_returns_expected_scores(): anomaly_scorer = MedianCDFQuantileScorer() anomaly_scorer.fit(np.array(range(0, 20))) - assert anomaly_scorer.score(np.array([8, 17]))[0] == approx(1 - 17 / 20, abs=0.01) - assert anomaly_scorer.score(np.array([0])) == approx(1 - 1 / 20, abs=0.01) + assert anomaly_scorer.score(np.array([8, 17]))[0] == approx(1 - 18 / 21, abs=0.01) + assert anomaly_scorer.score(np.array([0])) == approx(1 - 2 / 21, abs=0.01) anomaly_scorer.fit(np.array([4.0 for i in range(200)])) assert anomaly_scorer.score(np.array([4])) == approx(0, abs=0.01) @@ -27,7 +28,7 @@ def test_given_data_with_nans_when_using_median_quantile_scorer_with_nan_support scorer.fit(training_data) assert scorer.score(np.array([1, 4, 8, np.nan])) == approx( - [-np.log(2 * 0.5 / 10), -np.log(2 * 3.5 / 10), -np.log(2 * 0.5 / 10), -np.log(2 * 1 / 10)] + [-np.log(2 * 1 / 11), -np.log(2 * 4 / 11), -np.log(2 * 1 / 11), -np.log(2 * 1.5 / 11)] ) @@ -38,5 +39,21 @@ def test_given_numpy_arrays_with_object_type_when_using_median_quantile_scorer_t scorer.fit(training_data) assert scorer.score(np.array([1, 4, 8, np.nan], dtype=object)) == approx( - [-np.log(2 * 0.5 / 10), -np.log(2 * 3.5 / 10), -np.log(2 * 0.5 / 10), -np.log(2 * 1 / 10)] + [-np.log(2 * 1 / 11), -np.log(2 * 4 / 11), -np.log(2 * 1 / 11), -np.log(2 * 1.5 / 11)] ) + + +def test_when_using_ranked_based_anomaly_scorer_then_returns_expected_scores(): + training_data = np.array([1, 2, 3, 4, 5, 6, 7, 8, np.nan, np.nan]) + + scorer = RankBasedAnomalyScorer() + scorer.fit(training_data) + + assert scorer.score(np.array([1, 4, 9, np.nan])) == approx( + [-np.log(2 * 2 / 11), -np.log(2 * 5 / 11), -np.log(2 * 1 / 11), -np.log(2 * 3 / 11)] + ) + + training_data = np.array([np.nan, np.nan, np.nan, np.nan]) + scorer.fit(training_data) + + assert scorer.score(np.array([1, np.nan])) == approx([-np.log(2 * 1 / 5), -np.log(1)])