-
Notifications
You must be signed in to change notification settings - Fork 61
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #27 from ndeutschmann/lmad-signoff
Latent metric anomaly detection
- Loading branch information
Showing
29 changed files
with
2,216 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import unittest | ||
from typing import Type | ||
from unittest import TestCase | ||
|
||
import numpy as np | ||
from torch import nn | ||
|
||
from tests.utils import PlusOne | ||
from uq360.algorithms.layer_scoring.latent_scorer import LatentScorer | ||
|
||
|
||
class LatentScorerTester(TestCase): | ||
ScorerClass: Type[LatentScorer] | ||
scorer_kwargs: dict | ||
predict_kwargs: dict | ||
fit_with_y: bool | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
if cls is LatentScorerTester: | ||
raise unittest.SkipTest("Skipping base class LatentScorerTester") | ||
super(LatentScorerTester, cls).setUpClass() | ||
|
||
def setUp(self) -> None: | ||
self.p1 = PlusOne() | ||
self.relu = nn.ReLU() | ||
self.model = nn.Sequential(self.p1, self.relu) | ||
self.layer = self.p1 | ||
|
||
@staticmethod | ||
def verify_latent_scorer_get_latents_case( | ||
ScorerClass: Type[LatentScorer], | ||
*, | ||
model: nn.Module = None, | ||
layer: nn.Module = None, | ||
X: np.array, | ||
expected_z: np.array, | ||
**scorer_kwargs, | ||
): | ||
scorer = ScorerClass(model=model, layer=layer, **scorer_kwargs) | ||
|
||
z = scorer.get_latents(X) | ||
|
||
return np.allclose(z, expected_z) | ||
|
||
def test_latent_scorer_get_latents(self): | ||
with self.subTest("No model extraction"): | ||
X = np.random.normal(size=(10, 5)) | ||
self.assertTrue( | ||
self.verify_latent_scorer_get_latents_case( | ||
self.ScorerClass, X=X, expected_z=X, **self.scorer_kwargs | ||
) | ||
) | ||
|
||
with self.subTest("Model layer extraction"): | ||
X = np.random.normal(size=(10, 5)) | ||
|
||
expected_z = X + 1.0 | ||
self.assertTrue( | ||
self.verify_latent_scorer_get_latents_case( | ||
self.ScorerClass, | ||
model=self.model, | ||
layer=self.layer, | ||
X=X, | ||
expected_z=expected_z, | ||
**self.scorer_kwargs, | ||
) | ||
) | ||
|
||
def test_fit_direct(self): | ||
try: | ||
fit_data = dict(X=np.random.normal(size=(100, 2))) | ||
if self.fit_with_y: | ||
fit_data.update(y=np.random.randint(0, 3, size=100)) | ||
|
||
scorer = self.ScorerClass(**self.scorer_kwargs) | ||
scorer.fit(**fit_data) | ||
except Exception as e: | ||
self.fail(f"{self.ScorerClass.__name__}.fit failed with {e}") | ||
|
||
def test_fit_latent(self): | ||
try: | ||
fit_data = dict(X=np.random.normal(size=(100, 2))) | ||
if self.fit_with_y: | ||
fit_data.update(y=np.random.randint(0, 3, size=100)) | ||
|
||
scorer = self.ScorerClass( | ||
model=self.model, layer=self.layer, **self.scorer_kwargs | ||
) | ||
scorer.fit(**fit_data) | ||
except Exception as e: | ||
self.fail(f"{self.ScorerClass.__name__}.fit failed with {e}") | ||
|
||
def test_predict_direct(self): | ||
n_per_class = 50 | ||
d = 3 | ||
X1 = np.random.normal(1.0, 0.1, size=(n_per_class, d)).astype(np.float32) | ||
X0 = np.random.normal(0.0, 0.1, size=(n_per_class, d)).astype(np.float32) | ||
fit_data = dict(X=np.concatenate([X0, X1], axis=0)) | ||
if self.fit_with_y: | ||
y1 = np.ones(shape=(n_per_class,)).astype(np.float32) | ||
y0 = np.zeros(shape=(n_per_class,)).astype(np.float32) | ||
fit_data.update(y=np.concatenate([y0, y1])) | ||
|
||
scorer = self.ScorerClass(**self.scorer_kwargs) | ||
scorer.fit(**fit_data) | ||
|
||
n_query = 3 | ||
X_query_0 = 0.0 * np.ones((n_query, d)).astype(np.float32) | ||
X_query_20 = 20.0 * np.ones((n_query, d)).astype(np.float32) | ||
|
||
d0 = np.mean(scorer.predict(X_query_0)) | ||
d20 = np.mean(scorer.predict(X_query_20)) | ||
|
||
self.assertGreater(d20, d0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from tests.test_layer_scoring import LatentScorerTester | ||
from uq360.algorithms.layer_scoring.aklpe import AKLPEScorer | ||
from uq360.utils.transformers.nearest_neighbors.exact import ExactNearestNeighbors | ||
from uq360.utils.transformers.nearest_neighbors.faiss import FAISSNearestNeighbors | ||
from uq360.utils.transformers.nearest_neighbors.pynndescent import PyNNDNearestNeighbors | ||
|
||
|
||
class TestAKLPEScorerExact(LatentScorerTester): | ||
ScorerClass = AKLPEScorer | ||
fit_with_y = False | ||
scorer_kwargs = {"nearest_neighbors": ExactNearestNeighbors, "n_neighbors": 5} | ||
|
||
|
||
class TestAKLPEScorerPyNNDescent(LatentScorerTester): | ||
ScorerClass = AKLPEScorer | ||
fit_with_y = False | ||
scorer_kwargs = {"nearest_neighbors": PyNNDNearestNeighbors, "n_neighbors": 5} | ||
|
||
|
||
class TestAKLPEScorerFAISS(LatentScorerTester): | ||
ScorerClass = AKLPEScorer | ||
fit_with_y = False | ||
scorer_kwargs = {"nearest_neighbors": FAISSNearestNeighbors, "n_neighbors": 5} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from tests.test_layer_scoring import LatentScorerTester | ||
from uq360.algorithms.layer_scoring.knn import KNNScorer | ||
from uq360.utils.transformers.nearest_neighbors.exact import ExactNearestNeighbors | ||
from uq360.utils.transformers.nearest_neighbors.faiss import FAISSNearestNeighbors | ||
from uq360.utils.transformers.nearest_neighbors.pynndescent import PyNNDNearestNeighbors | ||
|
||
|
||
|
||
class TestKNNScorerExact(LatentScorerTester): | ||
ScorerClass = KNNScorer | ||
fit_with_y = False | ||
scorer_kwargs = {"nearest_neighbors": ExactNearestNeighbors, 'n_neighbors': 5} | ||
|
||
|
||
class TestKNNScorerPyNNDescent(LatentScorerTester): | ||
ScorerClass = KNNScorer | ||
fit_with_y = False | ||
scorer_kwargs = {"nearest_neighbors": PyNNDNearestNeighbors, 'n_neighbors': 5} | ||
|
||
|
||
class TestKNNScorerFAISS(LatentScorerTester): | ||
ScorerClass = KNNScorer | ||
fit_with_y = False | ||
scorer_kwargs = {"nearest_neighbors": FAISSNearestNeighbors, 'n_neighbors': 5} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from tests.test_layer_scoring import LatentScorerTester | ||
from uq360.algorithms.layer_scoring.mahalanobis import MahalanobisScorer | ||
|
||
|
||
class TestMahalanobisScorer(LatentScorerTester): | ||
ScorerClass = MahalanobisScorer | ||
fit_with_y = True | ||
scorer_kwargs = {} |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
from unittest import TestCase | ||
|
||
import torch | ||
|
||
from tests.utils import PlusOne | ||
from uq360.utils.latent_features import LatentFeatures | ||
|
||
|
||
class TestLatentFeatures(TestCase): | ||
N = 10 | ||
d = 2 | ||
|
||
def setUp(self): | ||
self.first_plus_one = PlusOne() | ||
self.relu = torch.nn.ReLU() | ||
self.second_plus_one = PlusOne() | ||
self.m = torch.nn.Sequential( | ||
self.first_plus_one, | ||
self.relu, | ||
self.second_plus_one | ||
) | ||
|
||
self.X_ones = torch.ones((self.N, self.d), dtype=torch.float32) | ||
self.X_norm = torch.empty((self.N, self.d), dtype=torch.float32).normal_() | ||
|
||
def test_init(self): | ||
pass | ||
|
||
def test_get_first_plus(self): | ||
extractor = LatentFeatures(self.m, self.first_plus_one) | ||
with self.subTest(): | ||
Z = extractor.extract(self.X_ones)[0] | ||
self.assertTrue(torch.all(Z == 2.)) | ||
with self.subTest(): | ||
Z = extractor.extract(self.X_norm)[0] | ||
self.assertTrue(torch.all(Z == (self.X_norm + 1.))) | ||
|
||
def test_get_relu(self): | ||
extractor = LatentFeatures(self.m, self.relu) | ||
with self.subTest(): | ||
Z = extractor.extract(self.X_ones)[0] | ||
self.assertTrue(torch.all(Z == 2.)) | ||
with self.subTest(): | ||
Z = extractor.extract(self.X_norm)[0] | ||
expected_Z = self.X_norm + 1. | ||
expected_Z[expected_Z < 0] = 0. | ||
self.assertTrue(torch.all(Z == expected_Z)) | ||
|
||
def test_multilayer(self): | ||
extractor = LatentFeatures(self.m, [self.first_plus_one, self.relu]) | ||
with self.subTest(): | ||
Z1, Z2 = extractor.extract(self.X_ones) | ||
self.assertTrue(torch.all(Z1 == 2.)) | ||
self.assertTrue(torch.all(Z2 == 2.)) | ||
with self.subTest(): | ||
Z1, Z2 = extractor.extract(self.X_norm) | ||
self.assertTrue(torch.all(Z1 == (self.X_norm + 1.))) | ||
expected_Z2 = self.X_norm + 1. | ||
expected_Z2[expected_Z2 < 0] = 0. | ||
self.assertTrue(torch.all(Z2 == expected_Z2)) | ||
|
||
|
||
def test_hook_cleanup(self): | ||
for layer in [self.first_plus_one, self.relu, self.second_plus_one]: | ||
with self.subTest(): | ||
layer_hooks = layer._forward_hooks.copy() | ||
|
||
extractor = LatentFeatures(self.m, layer) | ||
extractor.extract(self.X_norm) | ||
|
||
self.assertDictEqual(layer_hooks, layer._forward_hooks) | ||
|
||
def test_post_processing_fn(self): | ||
def ppf(x): | ||
return x**2 | ||
extractor = LatentFeatures(self.m, self.first_plus_one, post_processing_fn=ppf) | ||
for x in [self.X_ones, self.X_norm]: | ||
with self.subTest(): | ||
z = extractor.extract(x)[0] | ||
self.assertTrue( | ||
torch.all(z == (x+1)**2) | ||
) | ||
|
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from unittest import TestCase | ||
|
||
import numpy as np | ||
|
||
from uq360.utils.transformers.group_scaler import GroupScaler | ||
|
||
|
||
class TestGroupScaler(TestCase): | ||
def setUp(self) -> None: | ||
self.n_classes = 5 | ||
self.n_per_class = 7 | ||
self.d = 3 | ||
self.scaler = GroupScaler() | ||
|
||
self.X = np.empty(shape=(self.n_classes * self.n_per_class, self.d)) | ||
self.y = np.empty(shape=((self.n_classes * self.n_per_class,))) | ||
for i in range(self.n_classes): | ||
self.X[ | ||
i * self.n_per_class : (i + 1) * self.n_per_class | ||
] = np.random.normal(i, 0.2, size=(self.n_per_class, self.d)) | ||
self.y[i * self.n_per_class : (i + 1) * self.n_per_class] = i | ||
|
||
def test_fit(self): | ||
try: | ||
self.scaler.fit(self.X, self.y) | ||
except Exception as e: | ||
self.fail(f"GroupScaler.fit failed with {e}") | ||
|
||
def test_transform(self): | ||
self.scaler.fit(self.X, self.y) | ||
X_norm = self.scaler.transform(self.X, self.y) | ||
|
||
for i in range(self.n_classes): | ||
with self.subTest(): | ||
idx = self.y == i | ||
self.assertTrue(np.allclose(np.mean(X_norm[idx], axis=0), 0.0)) | ||
|
||
def test_fit_transform(self): | ||
X_norm = self.scaler.fit_transform(self.X, self.y) | ||
|
||
for i in range(self.n_classes): | ||
with self.subTest(): | ||
idx = self.y == i | ||
self.assertTrue(np.allclose(np.mean(X_norm[idx], axis=0), 0.0)) |
Oops, something went wrong.