Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions examples/knn_interpretability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
"""Example of using kNN for outlier detection with interpretability
Sample wise interpretation is provided here.
"""
# Author: Alaa Abdelwahab <alaa.ashraf.uni@gmail.com>
# License: BSD 2 clause

from __future__ import division
from __future__ import print_function

import os
import sys

# temporary solution for relative imports in case pyod is not installed
# if pyod is installed, no need to use the following line
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname("__file__"), '..')))

from scipy.io import loadmat
from sklearn.model_selection import train_test_split

from pyod.models.knn import KNN
from pyod.utils.utility import standardizer

if __name__ == "__main__":
# Define data file and read X and y
mat_file = 'cardio.mat'

mat = loadmat(os.path.join('data', mat_file))
X = mat['X']
y = mat['y'].ravel()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4,
random_state=1)

# standardizing data for processing
X_train_norm, X_test_norm = standardizer(X_train, X_test)

# train kNN detector
clf_name = 'KNN'
clf = KNN(n_neighbors=10, method='mean')
clf.fit(X_train)

# get the prediction labels and outlier scores of the training data
y_train_pred = clf.labels_ # binary labels (0: inliers, 1: outliers)
y_train_scores = clf.decision_scores_ # raw outlier scores

print('Training data has %d samples' % X_train.shape[0])
print('Training data has %d features' % X_train.shape[1])

# Explain the first sample
print('\nExplaining sample 0:')
print('True label:', 'Outlier' if y_train[0] == 1 else 'Inlier')
print('Predicted label:', 'Outlier' if y_train_pred[0] == 1 else 'Inlier')
print('Outlier score: %.4f' % y_train_scores[0])
print('\nGenerating dimensional outlier explanation...')

clf.explain_outlier(0)

# The horizontal bar chart shows per-feature average distances to k-neighbors.
# Features with bars exceeding the cutoff lines (dashed/dash-dot vertical lines)
# are the primary contributors to the sample being classified as an outlier.

# Example with custom cutoffs
print('\n' + '='*60)
print('Example with custom cutoff bands (80th and 95th percentiles):')
print('='*60)

clf.explain_outlier(0, cutoffs=[0.80, 0.95])

213 changes: 213 additions & 0 deletions pyod/models/knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from warnings import warn

import matplotlib.pyplot as plt
import numpy as np
from sklearn.neighbors import BallTree
from sklearn.neighbors import NearestNeighbors
Expand Down Expand Up @@ -167,6 +168,8 @@ def __init__(self, contamination=0.1, n_neighbors=5, method='largest',
metric_params=self.metric_params,
n_jobs=self.n_jobs,
**kwargs)
# Cache for dimensional scores to avoid recomputation
self._cached_dimensional_scores = {} # {columns_tuple: scores_array}

def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Expand All @@ -189,6 +192,12 @@ def fit(self, X, y=None):
X = check_array(X)
self._set_n_classes(y)

# Store training data for explainability
self.X_train_ = X

# Clear cache when fitting new data
self._cached_dimensional_scores = {}

self.neigh_.fit(X)

# In certain cases, _tree does not exist for NearestNeighbors
Expand Down Expand Up @@ -275,3 +284,207 @@ def _get_dist_by_method(self, dist_arr):
return np.mean(dist_arr, axis=1)
elif self.method == 'median':
return np.median(dist_arr, axis=1)

def get_outlier_explainability_scores(self, ind, columns=None):
"""Compute per-feature outlier explainability scores.

Calculates the average absolute distance to k-nearest neighbors
for each feature dimension.

Parameters
----------
ind : int
Index of the sample in training data.

columns : list, optional (default=None)
Specific feature indices. If None, use all features.

Returns
-------
scores : numpy array of shape (n_features,)
Average absolute distance to k-neighbors per dimension.
"""
check_is_fitted(self, ['X_train_', 'tree_'])

sample = self.X_train_[ind:ind+1, :]
_, neighbor_indices = self.tree_.query(sample, k=self.n_neighbors)
neighbors = self.X_train_[neighbor_indices[0], :]

if columns is None:
dim_distances = np.abs(neighbors - self.X_train_[ind, :])
else:
dim_distances = np.abs(neighbors[:, columns] - self.X_train_[ind, columns])

return np.mean(dim_distances, axis=0)


def explain_outlier(self, ind, columns=None, cutoffs=None,
feature_names=None, file_name=None,
file_type=None, max_features_per_plot=20,
compute_cutoffs=True): # pragma: no cover
"""Plot dimensional outlier graph for a given data point.

Parameters
----------
ind : int
The index of the data point to explain.

columns : list, optional
Specify a list of features/dimensions for plotting. If not
specified, use all features.

cutoffs : list of floats in (0., 1), optional (default=[0.95, 0.99])
The significance cutoff bands of the dimensional outlier graph.

feature_names : list of strings, optional
The display names of all columns of the dataset,
to show on the y-axis of the plot.

file_name : string, optional
The name to save the figure.

file_type : string, optional
The file type to save the figure.

max_features_per_plot : int, optional (default=20)
Maximum number of features per plot. Splits into multiple plots if exceeded.

compute_cutoffs : bool, optional (default=True)
If True, computes dimensional scores for all samples to generate cutoff bands.
If False, only computes dimensional score for the target sample (much faster).
When True, results are cached for subsequent calls.

Returns
-------
scores : numpy array
The per-feature outlier scores for the specified sample.
"""
check_is_fitted(self, ['X_train_', 'tree_', 'labels_'])

if columns is None:
columns = list(range(self.X_train_.shape[1]))

cutoffs = [1 - self.contamination, 0.99] if cutoffs is None else cutoffs

# Compute dimensional scores for target sample
dim_scores = self.get_outlier_explainability_scores(ind, columns)

# Compute cutoff bands if requested
if compute_cutoffs:
cache_key = tuple(columns)
if cache_key in self._cached_dimensional_scores:
all_scores = self._cached_dimensional_scores[cache_key]
else:
all_scores = np.zeros((self.X_train_.shape[0], len(columns)))
for i in range(self.X_train_.shape[0]):
all_scores[i, :] = self.get_outlier_explainability_scores(i, columns)
self._cached_dimensional_scores[cache_key] = all_scores

cutoff_bands = {c: np.quantile(all_scores, q=c, axis=0) for c in cutoffs}
else:
cutoff_bands = None

# Set feature names
if feature_names is None:
feature_names = [f'Feature {i}' for i in columns]

# Split into chunks if needed
n_features = len(columns)
chunks = []
for start in range(0, n_features, max_features_per_plot):
end = min(start + max_features_per_plot, n_features)
chunk_cutoffs = None
if cutoff_bands:
chunk_cutoffs = {c: cutoff_bands[c][start:end] for c in cutoffs}
chunks.append((columns[start:end], dim_scores[start:end],
feature_names[start:end], chunk_cutoffs))

# Plot each chunk
for chunk_idx, (chunk_cols, chunk_scores, chunk_names, chunk_cutoffs) in enumerate(chunks):
self._plot_explanation_chunk(ind, chunk_cols, chunk_scores, chunk_names,
chunk_cutoffs, chunk_idx, len(chunks),
file_name, file_type)

return dim_scores


def _plot_explanation_chunk(self, ind, columns, scores, names, cutoff_bands,
idx, total, file_name, file_type):
"""Helper to plot one feature chunk."""
n = len(columns)
fig, ax = plt.subplots(figsize=(10, max(6, n * 0.4)))
y_pos = np.arange(n)

# Determine colors based on cutoffs
if cutoff_bands:
cutoffs_sorted = sorted(cutoff_bands.keys())
colors = []
for i, score in enumerate(scores):
if len(cutoffs_sorted) >= 2:
if score >= cutoff_bands[cutoffs_sorted[-1]][i]:
colors.append('#d62728') # Red
elif score >= cutoff_bands[cutoffs_sorted[0]][i]:
colors.append('#ff7f0e') # Orange
else:
colors.append('#1f77b4') # Blue
else:
colors.append('#d62728' if score >= cutoff_bands[cutoffs_sorted[0]][i] else '#1f77b4')
else:
colors = ['#000000'] * n # Black when no cutoffs

# Plot bars
bars = ax.barh(y_pos, scores, color=colors, alpha=0.7,
edgecolor='black', linewidth=0.5)

# Add cutoff lines
if cutoff_bands:
cutoffs_sorted = sorted(cutoff_bands.keys())
styles = ['--', '-.', ':']
line_colors = ['#ff7f0e', '#d62728', '#8c564b']
for cutoff, style, color in zip(cutoffs_sorted, styles, line_colors):
for i, val in enumerate(cutoff_bands[cutoff]):
ax.plot([val, val], [i-0.4, i+0.4], style, color=color, linewidth=2,
label=f'{cutoff:.2f} Cutoff' if i == 0 else "")

# Formatting
ax.set_yticks(y_pos)
ax.set_yticklabels(names)
ax.set_xlabel('Average Distance to k-Neighbors', fontsize=12)
ax.set_ylabel('Features', fontsize=12)

label = 'Outlier' if self.labels_[ind] == 1 else 'Inlier'
overall_knn_score = self.decision_scores_[ind]

if total > 1:
title = (f'Outlier score breakdown for sample #{ind+1} ({label})\n'
f'k={self.n_neighbors} | Overall KNN={overall_knn_score:.3f} | '
f'Features {columns[0]+1}-{columns[-1]+1} (Part {idx+1}/{total})')
else:
title = (f'Outlier score breakdown for sample #{ind+1} ({label})\n'
f'k={self.n_neighbors}, method={self.method} | Overall KNN={overall_knn_score:.3f}')
ax.set_title(title, fontsize=14, fontweight='bold')

# Value labels
for bar, score in zip(bars, scores):
ax.text(bar.get_width(), bar.get_y() + bar.get_height()/2,
f' {score:.3f}', ha='left', va='center', fontsize=9)

# Legend
if cutoff_bands:
handles, labels = ax.get_legend_handles_labels()
by_label = dict(zip(labels, handles))
if by_label:
ax.legend(by_label.values(), by_label.keys(), loc='best', framealpha=0.9)

ax.grid(axis='x', alpha=0.3, linestyle=':')
plt.tight_layout()

# Save file
if file_name:
name = f'{file_name}_part{idx+1}of{total}' if total > 1 else file_name
if file_type:
plt.savefig(f'{name}.{file_type}', dpi=300, bbox_inches='tight')
else:
plt.savefig(f'{name}.png', dpi=300, bbox_inches='tight')

plt.show()
43 changes: 43 additions & 0 deletions pyod/test/test_knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,49 @@ def test_predict_rank_normalized(self):
assert_array_less(pred_ranks, 1.01)
assert_array_less(-0.1, pred_ranks)

def test_get_outlier_explainability_scores(self):
"""Test get_outlier_explainability_scores() method.

Validates that the method returns correct dimensional scores
for known outlier and inlier samples.
"""
# Create a simple 2D dataset where outliers are obvious
# Point [10, 0] is an outlier in Dimension 0 (X), but normal in Dimension 1 (Y)
X_train = np.array([[0, 0], [1, 0], [0, 1], [1, 1], [10, 0]])

# Fit model
clf = KNN(n_neighbors=3, contamination=0.2)
clf.fit(X_train)

# Test explaining the outlier (index 4: [10, 0])
scores = clf.get_outlier_explainability_scores(ind=4)

# Check shape: should have 1 score per feature
assert_equal(scores.shape[0], X_train.shape[1])

# Check that scores are non-negative (distances cannot be negative)
assert_array_less(-1e-10, scores)

# For the outlier [10, 0], dimension 0 should have much higher score than dimension 1
# because it's far from neighbors in X but close in Y
assert (scores[0] > scores[1]), \
"Outlier in dimension 0 should have higher score than dimension 1"

# Test explaining an inlier (index 0: [0, 0])
scores_inlier = clf.get_outlier_explainability_scores(ind=0)
assert_equal(scores_inlier.shape[0], X_train.shape[1])
assert_array_less(-1e-10, scores_inlier)

# Test with specific columns parameter
scores_cols = clf.get_outlier_explainability_scores(ind=4, columns=[0])
assert_equal(scores_cols.shape[0], 1)
assert_array_less(-1e-10, scores_cols)

# Test with multiple columns
scores_multi = clf.get_outlier_explainability_scores(ind=4, columns=[0, 1])
assert_equal(scores_multi.shape[0], 2)
assert_array_less(-1e-10, scores_multi)

def test_model_clone(self):
clone_clf = clone(self.clf)

Expand Down