From c510b4a0e7bcfc22812cadd8c475ba508d482b5c Mon Sep 17 00:00:00 2001 From: Amrit K Date: Tue, 14 Nov 2023 10:25:39 -0500 Subject: [PATCH] Convert task classes to more generic so they can be re-used --- .pre-commit-config.yaml | 4 +- cyclops/models/wrappers/utils.py | 4 +- cyclops/tasks/__init__.py | 2 - cyclops/tasks/base.py | 20 +- cyclops/tasks/classification.py | 484 +++++++++++++++++- cyclops/tasks/cxr_classification.py | 209 -------- cyclops/tasks/mortality_prediction.py | 341 ------------ cyclops/tasks/utils.py | 17 +- .../cyclops/tasks/test_cxr_classification.py | 35 -- .../tasks/test_mortality_prediction.py | 35 -- 10 files changed, 503 insertions(+), 648 deletions(-) delete mode 100644 cyclops/tasks/cxr_classification.py delete mode 100644 cyclops/tasks/mortality_prediction.py delete mode 100644 tests/cyclops/tasks/test_cxr_classification.py delete mode 100644 tests/cyclops/tasks/test_mortality_prediction.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1ac557aea..c0746e188 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: - id: black - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: 'v0.1.0' + rev: 'v0.1.5' hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] @@ -34,7 +34,7 @@ repos: entry: python3 -m mypy --config-file pyproject.toml language: system types: [python] - exclude: "use_cases|tests|cyclops/(process|models|tasks|monitor|report/plot)" + exclude: "use_cases|tests|cyclops/(process|models|monitor|report/plot)" - repo: local hooks: diff --git a/cyclops/models/wrappers/utils.py b/cyclops/models/wrappers/utils.py index 1b3015a72..a8b8665c2 100644 --- a/cyclops/models/wrappers/utils.py +++ b/cyclops/models/wrappers/utils.py @@ -4,7 +4,7 @@ import os import random from collections import defaultdict -from typing import Mapping, Sequence, Union +from typing import Any, Mapping, Sequence, Union import numpy as np import torch @@ -65,7 +65,7 @@ def to_tensor( ) -def to_numpy(X): +def to_numpy(X) -> Union[np.typing.NDArray[Any], Sequence, Mapping]: """Convert the input to a numpy array. Parameters diff --git a/cyclops/tasks/__init__.py b/cyclops/tasks/__init__.py index 4fcd45abb..64afadd4b 100644 --- a/cyclops/tasks/__init__.py +++ b/cyclops/tasks/__init__.py @@ -4,5 +4,3 @@ BinaryTabularClassificationTask, MultilabelImageClassificationTask, ) -from cyclops.tasks.cxr_classification import CXRClassificationTask -from cyclops.tasks.mortality_prediction import MortalityPredictionTask diff --git a/cyclops/tasks/base.py b/cyclops/tasks/base.py index dc95dfab7..f897143cd 100644 --- a/cyclops/tasks/base.py +++ b/cyclops/tasks/base.py @@ -44,18 +44,16 @@ def __init__( """ self.models = prepare_models(models) self._validate_models() - self.task_features = ( - [task_features] if isinstance(task_features, str) else task_features - ) + self.task_features = task_features self.task_target = ( [task_target] if isinstance(task_target, str) else task_target ) self.device = get_device() - self.trained_models = [] - self.pretrained_models = [] + self.trained_models: List[str] = [] + self.pretrained_models: List[str] = [] @property - def models_count(self): + def models_count(self) -> int: """Number of models in the task. Returns @@ -92,7 +90,7 @@ def data_type(self) -> str: """ raise NotImplementedError - def list_models(self): + def list_models(self) -> List[str]: """List the names of the models in the task. Returns @@ -104,14 +102,14 @@ def list_models(self): return list(self.models.keys()) @abstractmethod - def _validate_models(self): + def _validate_models(self) -> None: """Validate the models for the task data type.""" raise NotImplementedError def add_model( self, model: Union[str, WrappedModel, Dict[str, WrappedModel]], - ): + ) -> None: """Add a model to the task. Parameters @@ -167,7 +165,7 @@ def save_model( self, filepath: Union[str, Dict[str, str]], model_names: Optional[Union[str, List[str]]] = None, - **kwargs, + **kwargs: Any, ) -> None: """Save the model to a specified filepath. @@ -219,7 +217,7 @@ def load_model( self, filepath: str, model_name: Optional[str] = None, - **kwargs, + **kwargs: Any, ) -> WrappedModel: """Load a pretrained model. diff --git a/cyclops/tasks/classification.py b/cyclops/tasks/classification.py index aa9b221b1..025b34f4c 100644 --- a/cyclops/tasks/classification.py +++ b/cyclops/tasks/classification.py @@ -1,12 +1,37 @@ """Classification tasks.""" +import logging +from functools import partial +from typing import Any, Dict, List, Optional, Tuple, Union +import numpy as np +import pandas as pd +from datasets import Dataset, DatasetDict, config +from monai.transforms import Compose # type: ignore +from sklearn.compose import ColumnTransformer +from sklearn.exceptions import NotFittedError + +from cyclops.data.slicer import SliceSpec +from cyclops.evaluate.evaluator import evaluate +from cyclops.evaluate.fairness.config import FairnessConfig +from cyclops.evaluate.metrics.factory import create_metric +from cyclops.evaluate.metrics.metric import MetricCollection from cyclops.models.catalog import ( _img_model_keys, _model_names_mapping, _static_model_keys, ) +from cyclops.models.utils import get_split +from cyclops.models.wrappers import WrappedModel +from cyclops.models.wrappers.sk_model import SKModel +from cyclops.models.wrappers.utils import to_numpy from cyclops.tasks.base import BaseTask +from cyclops.tasks.utils import apply_image_transforms +from cyclops.utils.log import setup_logging + + +LOGGER = logging.getLogger(__name__) +setup_logging(print_level="INFO", logger=LOGGER) class BinaryTabularClassificationTask(BaseTask): @@ -36,13 +61,303 @@ def data_type(self) -> str: """ return "tabular" - def _validate_models(self): + def _validate_models(self) -> None: """Validate the models for the task data type.""" assert all( - _model_names_mapping.get(model.model.__name__) in _static_model_keys + _model_names_mapping.get(model.model.__name__) in _static_model_keys # type: ignore for model in self.models.values() ), "All models must be static type model." + def train( + self, + X: Union[np.typing.NDArray[Any], pd.DataFrame, Dataset, DatasetDict], + y: Optional[Union[np.typing.NDArray[Any], pd.Series]] = None, + model_name: Optional[str] = None, + transforms: Optional[ColumnTransformer] = None, + best_model_params: Optional[Dict[str, Any]] = None, + splits_mapping: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> WrappedModel: + """Fit a model on tabular data. + + Parameters + ---------- + X_train : Union[np.ndarray, pd.DataFrame, Dataset, DatasetDict] + Data features. + y_train : Optional[Union[np.ndarray, pd.Series]] + Data labels, required when the input data is not a Hugging Face dataset, \ + by default None + model_name : Optional[str], optional + Model name, required if more than one model exists, \ + by default None + transforms : Optional[ColumnTransformer], optional + Transformations to be applied to the data before \ + fitting the model, by default Noney default None + splits_mapping: Optional[dict], optional + Mapping from 'train', 'validation' and 'test' to dataset splits names \ + used when input is a dataset dictionary, \ + by default {"train": "train", "validation": "validation"} + best_model_params : Optional[Dict[str, Any]], optional + Parameters for finding the best model, by default None + **kwargs: Any, optional + Additional parameters for the model. + + Returns + ------- + WrappedModel + The trained model. + + """ + if splits_mapping is None: + splits_mapping = {"train": "train", "validation": "validation"} + model_name, model = self.get_model(model_name) + if isinstance(X, (Dataset, DatasetDict)): + if best_model_params: + metric = best_model_params.pop("metric", None) + method = best_model_params.pop("method", "grid") + model.find_best( + best_model_params, + X, + feature_columns=self.task_features, + target_columns=self.task_target, + transforms=transforms, + metric=metric, + method=method, + splits_mapping=splits_mapping, + **kwargs, + ) + else: + model.fit( + X, + feature_columns=self.task_features, + target_columns=self.task_target, + transforms=transforms, + splits_mapping=splits_mapping, + **kwargs, + ) + + else: + if y is None: + raise ValueError( + "Missing data labels 'y'. Please provide the labels for \ + the training data when not using a Hugging Face dataset \ + as the input.", + ) + + X = to_numpy(X) + if transforms is not None: + try: + X = transforms.transform(X) + except NotFittedError: + X = transforms.fit_transform(X) + y = to_numpy(y) + assert len(X) == len(y) + + if best_model_params: + metric = best_model_params.pop("metric", None) + method = best_model_params.pop("method", "grid") + model.find_best( + best_model_params, + X, + y=y, # type: ignore + metric=metric, + method=method, + **kwargs, + ) + else: + model.fit(X, y, **kwargs) # type: ignore + + self.trained_models.append(model_name) + return model + + def predict( + self, + dataset: Union[np.typing.NDArray[Any], pd.DataFrame, Dataset, DatasetDict], + model_name: Optional[str] = None, + transforms: Optional[ColumnTransformer] = None, + proba: bool = True, + splits_mapping: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> Union[np.typing.NDArray[Any], Dataset]: + """Predict mortality on the given dataset. + + Parameters + ---------- + dataset : Union[np.ndarray, pd.DataFrame, Dataset, DatasetDict] + Data features. + model_name : Optional[str], optional + Model name, required if more than one model exists, by default None + transforms : Optional[ColumnTransformer], optional + Transformations to be applied to the data before \ + prediction. This is used when the input is a \ + Hugging Face Dataset, by default None, by default None + proba: bool + Predict probabilities, default True + splits_mapping: Optional[dict], optional + Mapping from 'train', 'validation' and 'test' to dataset splits names, \ + used when input is a dataset dictionary, by default {"test": "test"} + **kwargs: Any, optional + Additional parameters for the prediction. + + Returns + ------- + Union[np.ndarray, Dataset] + Predicted labels or the Hugging Face dataset with predicted labels. + + Raises + ------ + NotFittedError + If the model is not fitted or not loaded with a pretrained estimator. + + """ + if splits_mapping is None: + splits_mapping = {"test": "test"} + model_name, model = self.get_model(model_name) + if model_name not in self.pretrained_models + self.trained_models: + raise NotFittedError( + f"It seems you have neither trained the {model_name} model nor \ + loaded a pretrained model.", + ) + + if isinstance(dataset, (Dataset, DatasetDict)): + if proba and isinstance(model, SKModel): + return model.predict_proba( + dataset, + feature_columns=self.task_features, + transforms=transforms, + model_name=model_name, + splits_mapping=splits_mapping, + **kwargs, + ) + + return model.predict( + dataset, + feature_columns=self.task_features, + transforms=transforms, + model_name=model_name, + splits_mapping=splits_mapping, + **kwargs, + ) + + dataset = to_numpy(dataset) + + if transforms is not None: + try: + dataset = transforms.transform(dataset) + except NotFittedError: + LOGGER.warning("Fitting preprocessor on evaluation dataset.") + dataset = transforms.fit_transform(dataset) + + if proba and isinstance(model, SKModel): + predictions = model.predict_proba(dataset, **kwargs) + else: + predictions = model.predict(dataset, **kwargs) + + return predictions + + def evaluate( + self, + dataset: Union[Dataset, DatasetDict], + metrics: Union[List[str], MetricCollection], + model_names: Optional[Union[str, List[str]]] = None, + transforms: Optional[ColumnTransformer] = None, + prediction_column_prefix: str = "predictions", + splits_mapping: Optional[Dict[str, str]] = None, + slice_spec: Optional[SliceSpec] = None, + batch_size: int = config.DEFAULT_MAX_BATCH_SIZE, + remove_columns: Optional[Union[str, List[str]]] = None, + fairness_config: Optional[FairnessConfig] = None, + override_fairness_metrics: bool = False, + ) -> Tuple[Dict[str, Any], Dataset]: + """Evaluate model(s) on a HuggingFace dataset. + + Parameters + ---------- + dataset : Union[Dataset, DatasetDict] + HuggingFace dataset. + metrics : Union[List[str], MetricCollection] + Metrics to be evaluated. + model_names : Union[str, List[str]], optional + Model names to be evaluated, if not specified all fitted models \ + will be used for evaluation, by default None + transforms : Optional[ColumnTransformer], optional + Transformations to be applied to the data before prediction, \ + by default None + prediction_column_prefix : str, optional + Name of the prediction column to be added to \ + the dataset, by default "predictions" + splits_mapping: Optional[dict], optional + Mapping from 'train', 'validation' and 'test' to dataset splits names \ + used when input is a dataset dictionary, by default {"test": "test"} + slice_spec : Optional[SlicingConfig], optional + Specifications for creating a slices of a dataset, by default None + batch_size : int, optional + Batch size for batched prediction and evaluation, \ + by default config.DEFAULT_MAX_BATCH_SIZE + remove_columns : Optional[Union[str, List[str]]], optional + Unnecessary columns to be removed from the dataset, by default None + fairness_config : Optional[FairnessConfig], optional + The configuration for computing fairness metrics. If None, no fairness \ + metrics will be computed, by default None + override_fairness_metrics : bool, optional + If True, the `metrics` argument in fairness_config will be overridden by \ + the `metrics`, by default False + + Returns + ------- + Dict[str, Any] + Dictionary with evaluation results. + + """ + if splits_mapping is None: + splits_mapping = {"test": "test"} + if isinstance(metrics, list) and len(metrics): + metrics_collection = MetricCollection( + [ + create_metric( + m, + task=self.task_type, + num_labels=len(self.task_features), + ) + for m in metrics + ], + ) + + if isinstance(model_names, str): + model_names = [model_names] + elif not model_names: + model_names = self.pretrained_models + self.trained_models + + for model_name in model_names: + if model_name not in self.pretrained_models + self.trained_models: + LOGGER.warning( + "It seems you have neither trained the model nor \ + loaded a pretrained model.", + ) + + dataset = self.predict( + dataset, + model_name=model_name, + transforms=transforms, + prediction_column_prefix=prediction_column_prefix, + only_predictions=False, + splits_mapping=splits_mapping, + ) + + results = evaluate( + dataset=dataset, + metrics=metrics_collection, + target_columns=self.task_target, + slice_spec=slice_spec, + prediction_column_prefix=prediction_column_prefix, + remove_columns=remove_columns, + split=splits_mapping["test"], + batch_size=batch_size, + fairness_config=fairness_config, + override_fairness_metrics=override_fairness_metrics, + ) + return results, dataset + class MultilabelImageClassificationTask(BaseTask): """Binary tabular classification task.""" @@ -71,12 +386,173 @@ def data_type(self) -> str: """ return "image" - def _validate_models(self): + def _validate_models(self) -> None: """Validate the models for the task data type.""" assert all( - _model_names_mapping.get(model.model.__name__) in _img_model_keys + _model_names_mapping.get(model.model.__name__) in _img_model_keys # type: ignore for model in self.models.values() ), "All models must be image type model." for model in self.models.values(): model.initialize() + + def predict( + self, + dataset: Union[np.typing.NDArray[Any], Dataset, DatasetDict], + model_name: Optional[str] = None, + transforms: Optional[Compose] = None, + splits_mapping: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> Union[np.typing.NDArray[Any], Dataset]: + """Predict the pathologies on the given dataset. + + Parameters + ---------- + dataset : Union[np.ndarray, Dataset, DatasetDict] + Image representation as a numpy array or a Hugging Face dataset. + model_name : Optional[str], optional + Model name, required if more than one model exists, by default None + transforms : Optional[Compose], optional + Transforms to be applied to the data, by default None + splits_mapping: Optional[dict], optional + Mapping from 'train', 'validation' and 'test' to dataset splits names, \ + used when input is a dataset dictionary, by default {"test": "test"} + **kwargs: Any, optional + Additional parameters for the prediction. + + Returns + ------- + Union[np.typing.NDArray[Any], Dataset] + Predicted labels or the Hugging Face dataset with predicted labels. + + """ + if splits_mapping is None: + splits_mapping = {"test": "test"} + model_name, model = self.get_model(model_name) + + if transforms: + transforms = partial(apply_image_transforms, transforms=transforms) # type: ignore + + if isinstance(dataset, (Dataset, DatasetDict)): + return model.predict( + dataset, + feature_columns=self.task_features, + transforms=transforms, + model_name=model_name, + splits_mapping=splits_mapping, + **kwargs, + ) + + return model.predict(dataset, **kwargs) + + def evaluate( + self, + dataset: Union[Dataset, DatasetDict], + metrics: Union[List[str], MetricCollection], + model_names: Optional[Union[str, List[str]]] = None, + transforms: Optional[Compose] = None, + prediction_column_prefix: str = "predictions", + splits_mapping: Optional[Dict[str, str]] = None, + slice_spec: Optional[SliceSpec] = None, + batch_size: int = 64, + remove_columns: Optional[Union[str, List[str]]] = None, + fairness_config: Optional[FairnessConfig] = None, + override_fairness_metrics: bool = False, + ) -> Tuple[Dict[str, Any], Dataset]: + """Evaluate model(s) on a HuggingFace dataset. + + Parameters + ---------- + dataset : Union[Dataset, DatasetDict] + HuggingFace dataset. + metrics : Union[List[str], MetricCollection] + Metrics to be evaluated. + model_names : Union[str, List[str]], optional + Model names to be evaluated, required if more than one model exists, \ + by default Nonee + transforms : Optional[Compose], optional + Transforms to be applied to the data, by default None + prediction_column_prefix : str, optional + Name of the prediction column to be added to the dataset, \ + by default "predictions" + splits_mapping: Optional[dict], optional + Mapping from 'train', 'validation' and 'test' to dataset splits names \ + used when input is a dataset dictionary, by default {"test": "test"} + slice_spec : Optional[SliceSpec], optional + Specifications for creating a slices of a dataset, by default None + batch_size : int, optional + Batch size for batched evaluation, by default 64 + remove_columns : Optional[Union[str, List[str]]], optional + Unnecessary columns to be removed from the dataset, by default None + fairness_config : Optional[FairnessConfig], optional + The configuration for computing fairness metrics. If None, no fairness \ + metrics will be computed, by default None + override_fairness_metrics : bool, optional + If True, the `metrics` argument in fairness_config will be overridden by \ + the `metrics`, by default False + + Returns + ------- + Dict[str, Any] + Dictionary with evaluation results. + + """ + if splits_mapping is None: + splits_mapping = {"test": "test"} + if isinstance(dataset, DatasetDict): + split = get_split(dataset, "test", splits_mapping=splits_mapping) + dataset = dataset[split] + + missing_labels = [ + label for label in self.task_target if label not in dataset.column_names + ] + if len(missing_labels): + + def add_missing_labels(examples: Dict[str, Any]) -> Dict[str, Any]: + for label in missing_labels: + examples[label] = 0.0 + return examples + + dataset = dataset.map(add_missing_labels) + + if isinstance(metrics, list) and len(metrics): + metrics_collection = MetricCollection( + [ + create_metric( + m, + task=self.task_type, + num_labels=len(self.task_target), + ) + for m in metrics + ], + ) + + if isinstance(model_names, str): + model_names = [model_names] + elif model_names is None: + model_names = self.list_models() + + for model_name in model_names: + dataset = self.predict( + dataset, + model_name=model_name, + transforms=transforms, + prediction_column_prefix=prediction_column_prefix, + only_predictions=False, + splits_mapping=splits_mapping, + ) + + results = evaluate( + dataset=dataset, + metrics=metrics_collection, + slice_spec=slice_spec, + target_columns=self.task_target, + prediction_column_prefix=prediction_column_prefix, + remove_columns=remove_columns, + split=splits_mapping["test"], + batch_size=batch_size, + fairness_config=fairness_config, + override_fairness_metrics=override_fairness_metrics, + ) + + return results, dataset diff --git a/cyclops/tasks/cxr_classification.py b/cyclops/tasks/cxr_classification.py deleted file mode 100644 index 838878ef9..000000000 --- a/cyclops/tasks/cxr_classification.py +++ /dev/null @@ -1,209 +0,0 @@ -"""Chest X-ray Classification Task.""" - -import logging -from functools import partial -from typing import Any, Dict, List, Optional, Sequence, Union - -import numpy as np -from datasets import Dataset, DatasetDict -from monai.transforms import Compose - -from cyclops.data.slicer import SliceSpec -from cyclops.evaluate.evaluator import evaluate -from cyclops.evaluate.fairness.config import FairnessConfig -from cyclops.evaluate.metrics.factory import create_metric -from cyclops.evaluate.metrics.metric import MetricCollection -from cyclops.models.utils import get_split -from cyclops.models.wrappers import WrappedModel -from cyclops.tasks import MultilabelImageClassificationTask -from cyclops.tasks.utils import CXR_TARGET, apply_image_transforms -from cyclops.utils.log import setup_logging - - -LOGGER = logging.getLogger(__name__) -setup_logging(print_level="INFO", logger=LOGGER) - - -class CXRClassificationTask(MultilabelImageClassificationTask): - """Chest X-ray classification task modeled as a multi-label classification task.""" - - def __init__( - self, - models: Union[ - str, - WrappedModel, - Sequence[Union[str, WrappedModel]], - Dict[str, WrappedModel], - ], - task_features: Union[str, List[str]] = "image", - task_target: Union[str, List[str]] = CXR_TARGET, - ) -> None: - """Chest X-ray classification task. - - Parameters - ---------- - models - The model(s) to be used for prediction, and evaluation. - task_features : List[str] - List of feature names. - task_target : str - List of target names. - - """ - super().__init__(models, task_features, task_target) - - def predict( - self, - dataset: Union[np.ndarray, Dataset, DatasetDict], - model_name: Optional[str] = None, - transforms: Optional[Compose] = None, - splits_mapping: dict = None, - **kwargs, - ) -> Union[np.ndarray, Dataset]: - """Predict the pathologies on the given dataset. - - Parameters - ---------- - dataset : Union[np.ndarray, Dataset, DatasetDict] - Image representation as a numpy array or a Hugging Face dataset. - model_name : Optional[str], optional - Model name, required if more than one model exists, by default None - transforms : Optional[Compose], optional - Transforms to be applied to the data, by default None - splits_mapping: Optional[dict], optional - Mapping from 'train', 'validation' and 'test' to dataset splits names, \ - used when input is a dataset dictionary, by default {"test": "test"} - **kwargs: dict, optional - Additional parameters for the prediction. - - Returns - ------- - Union[np.ndarray, Dataset] - Predicted labels or the Hugging Face dataset with predicted labels. - - """ - if splits_mapping is None: - splits_mapping = {"test": "test"} - model_name, model = self.get_model(model_name) - - if transforms: - transforms = partial(apply_image_transforms, transforms=transforms) - - if isinstance(dataset, (Dataset, DatasetDict)): - return model.predict( - dataset, - feature_columns=self.task_features, - transforms=transforms, - model_name=model_name, - splits_mapping=splits_mapping, - **kwargs, - ) - - return model.predict(dataset, **kwargs) - - def evaluate( - self, - dataset: Union[Dataset, DatasetDict], - metrics: Union[List[str], MetricCollection], - model_names: Optional[Union[str, List[str]]] = None, - transforms: Optional[Compose] = None, - prediction_column_prefix: str = "predictions", - splits_mapping: dict = None, - slice_spec: Optional[SliceSpec] = None, - batch_size: int = 64, - remove_columns: Optional[Union[str, List[str]]] = None, - fairness_config: Optional[FairnessConfig] = None, - override_fairness_metrics: bool = False, - ) -> Dict[str, Any]: - """Evaluate model(s) on a HuggingFace dataset. - - Parameters - ---------- - dataset : Union[Dataset, DatasetDict] - HuggingFace dataset. - metrics : Union[List[str], MetricCollection] - Metrics to be evaluated. - model_names : Union[str, List[str]], optional - Model names to be evaluated, required if more than one model exists, \ - by default Nonee - transforms : Optional[Compose], optional - Transforms to be applied to the data, by default None - prediction_column_prefix : str, optional - Name of the prediction column to be added to the dataset, \ - by default "predictions" - splits_mapping: Optional[dict], optional - Mapping from 'train', 'validation' and 'test' to dataset splits names \ - used when input is a dataset dictionary, by default {"test": "test"} - slice_spec : Optional[SliceSpec], optional - Specifications for creating a slices of a dataset, by default None - batch_size : int, optional - Batch size for batched evaluation, by default 64 - remove_columns : Optional[Union[str, List[str]]], optional - Unnecessary columns to be removed from the dataset, by default None - fairness_config : Optional[FairnessConfig], optional - The configuration for computing fairness metrics. If None, no fairness \ - metrics will be computed, by default None - override_fairness_metrics : bool, optional - If True, the `metrics` argument in fairness_config will be overridden by \ - the `metrics`, by default False - - Returns - ------- - Dict[str, Any] - Dictionary with evaluation results. - - """ - if splits_mapping is None: - splits_mapping = {"test": "test"} - if isinstance(dataset, DatasetDict): - split = get_split(dataset, "test", splits_mapping=splits_mapping) - dataset = dataset[split] - - missing_labels = [ - label for label in self.task_target if label not in dataset.column_names - ] - if len(missing_labels): - - def add_missing_labels(examples): - for label in missing_labels: - examples[label] = 0.0 - return examples - - dataset = dataset.map(add_missing_labels) - - if isinstance(metrics, list) and len(metrics): - metrics = [ - create_metric(m, task=self.task_type, num_labels=len(self.task_target)) - for m in metrics - ] - metrics = MetricCollection(metrics) - - if isinstance(model_names, str): - model_names = [model_names] - elif model_names is None: - model_names = self.list_models() - - for model_name in model_names: - dataset = self.predict( - dataset, - model_name=model_name, - transforms=transforms, - prediction_column_prefix=prediction_column_prefix, - only_predictions=False, - splits_mapping=splits_mapping, - ) - - results = evaluate( - dataset=dataset, - metrics=metrics, - slice_spec=slice_spec, - target_columns=self.task_target, - prediction_column_prefix=prediction_column_prefix, - remove_columns=remove_columns, - split=splits_mapping["test"], - batch_size=batch_size, - fairness_config=fairness_config, - override_fairness_metrics=override_fairness_metrics, - ) - - return results, dataset diff --git a/cyclops/tasks/mortality_prediction.py b/cyclops/tasks/mortality_prediction.py deleted file mode 100644 index adef88869..000000000 --- a/cyclops/tasks/mortality_prediction.py +++ /dev/null @@ -1,341 +0,0 @@ -"""Mortality Prediction Task.""" - -import logging -from typing import Any, Dict, List, Optional, Sequence, Union - -import numpy as np -import pandas as pd -from datasets import Dataset, DatasetDict, config -from sklearn.compose import ColumnTransformer -from sklearn.exceptions import NotFittedError - -from cyclops.data.slicer import SliceSpec -from cyclops.evaluate.evaluator import evaluate -from cyclops.evaluate.fairness.config import FairnessConfig -from cyclops.evaluate.metrics.factory import create_metric -from cyclops.evaluate.metrics.metric import MetricCollection -from cyclops.models.wrappers import WrappedModel -from cyclops.models.wrappers.sk_model import SKModel -from cyclops.models.wrappers.utils import to_numpy -from cyclops.tasks.classification import BinaryTabularClassificationTask -from cyclops.utils.log import setup_logging - - -LOGGER = logging.getLogger(__name__) -setup_logging(print_level="INFO", logger=LOGGER) - - -class MortalityPredictionTask(BinaryTabularClassificationTask): - """Mortality prediction task for tabular data as binary classification.""" - - def __init__( - self, - models: Union[ - str, - WrappedModel, - Sequence[Union[str, WrappedModel]], - Dict[str, WrappedModel], - ], - task_features: Union[str, List[str]] = None, - task_target: Union[str, List[str]] = None, - ) -> None: - """Mortality prediction task for tabular data. - - Parameters - ---------- - models - The model(s) to be used for training, prediction, and evaluation. - task_features : List[str] - List of feature names. - task_target : str - List of target names. - - """ - if task_target is None: - task_target = ["outcome_death"] - if task_features is None: - task_features = ["age", "sex", "admission_type", "admission_location"] - super().__init__(models, task_features, task_target) - - def train( - self, - X: Union[np.ndarray, pd.DataFrame, Dataset, DatasetDict], - y: Optional[Union[np.ndarray, pd.Series]] = None, - model_name: Optional[str] = None, - transforms: Optional[ColumnTransformer] = None, - best_model_params: Optional[dict] = None, - splits_mapping: dict = None, - **kwargs, - ) -> WrappedModel: - """Fit a model on tabular data. - - Parameters - ---------- - X_train : Union[np.ndarray, pd.DataFrame, Dataset, DatasetDict] - Data features. - y_train : Optional[Union[np.ndarray, pd.Series]] - Data labels, required when the input data is not a Hugging Face dataset, \ - by default None - model_name : Optional[str], optional - Model name, required if more than one model exists, \ - by default None - transforms : Optional[ColumnTransformer], optional - Transformations to be applied to the data before \ - fitting the model, by default Noney default None - splits_mapping: Optional[dict], optional - Mapping from 'train', 'validation' and 'test' to dataset splits names \ - used when input is a dataset dictionary, \ - by default {"train": "train", "validation": "validation"} - - Returns - ------- - WrappedModel - The trained model. - - """ - if splits_mapping is None: - splits_mapping = {"train": "train", "validation": "validation"} - model_name, model = self.get_model(model_name) - if isinstance(X, (Dataset, DatasetDict)): - if best_model_params: - metric = best_model_params.pop("metric", None) - method = best_model_params.pop("method", "grid") - model.find_best( - best_model_params, - X, - feature_columns=self.task_features, - target_columns=self.task_target, - transforms=transforms, - metric=metric, - method=method, - splits_mapping=splits_mapping, - **kwargs, - ) - else: - model.fit( - X, - feature_columns=self.task_features, - target_columns=self.task_target, - transforms=transforms, - splits_mapping=splits_mapping, - **kwargs, - ) - - else: - if y is None: - raise ValueError( - "Missing data labels 'y'. Please provide the labels for \ - the training data when not using a Hugging Face dataset \ - as the input.", - ) - - X = to_numpy(X) - if transforms is not None: - try: - X = transforms.transform(X) - except NotFittedError: - X = transforms.fit_transform(X) - y = to_numpy(y) - assert len(X) == len(y) - - if best_model_params: - metric = best_model_params.pop("metric", None) - method = best_model_params.pop("method", "grid") - model.find_best( - best_model_params, - X, - y=y, - metric=metric, - method=method, - **kwargs, - ) - else: - model.fit(X, y, **kwargs) - - self.trained_models.append(model_name) - return model - - def predict( - self, - dataset: Union[np.ndarray, pd.DataFrame, Dataset, DatasetDict], - model_name: Optional[str] = None, - transforms: Optional[ColumnTransformer] = None, - proba: bool = True, - splits_mapping: dict = None, - **kwargs, - ) -> Union[np.ndarray, Dataset]: - """Predict mortality on the given dataset. - - Parameters - ---------- - dataset : Union[np.ndarray, pd.DataFrame, Dataset, DatasetDict] - Data features. - model_name : Optional[str], optional - Model name, required if more than one model exists, by default None - transforms : Optional[ColumnTransformer], optional - Transformations to be applied to the data before \ - prediction. This is used when the input is a \ - Hugging Face Dataset, by default None, by default None - proba: bool - Predict probabilities, default True - splits_mapping: Optional[dict], optional - Mapping from 'train', 'validation' and 'test' to dataset splits names, \ - used when input is a dataset dictionary, by default {"test": "test"} - - Returns - ------- - Union[np.ndarray, Dataset] - Predicted labels or the Hugging Face dataset with predicted labels. - - Raises - ------ - NotFittedError - If the model is not fitted or not loaded with a pretrained estimator. - - """ - if splits_mapping is None: - splits_mapping = {"test": "test"} - model_name, model = self.get_model(model_name) - if model_name not in self.pretrained_models + self.trained_models: - raise NotFittedError( - f"It seems you have neither trained the {model_name} model nor \ - loaded a pretrained model.", - ) - - if isinstance(dataset, (Dataset, DatasetDict)): - if proba and isinstance(model, SKModel): - return model.predict_proba( - dataset, - feature_columns=self.task_features, - transforms=transforms, - model_name=model_name, - splits_mapping=splits_mapping, - **kwargs, - ) - - return model.predict( - dataset, - feature_columns=self.task_features, - transforms=transforms, - model_name=model_name, - splits_mapping=splits_mapping, - **kwargs, - ) - - dataset = to_numpy(dataset) - - if transforms is not None: - try: - dataset = transforms.transform(dataset) - except NotFittedError: - LOGGER.warning("Fitting preprocessor on evaluation dataset.") - dataset = transforms.fit_transform(dataset) - - if proba and isinstance(model, SKModel): - predictions = model.predict_proba(dataset, **kwargs) - else: - predictions = model.predict(dataset, **kwargs) - - return predictions - - def evaluate( - self, - dataset: Union[Dataset, DatasetDict], - metrics: Union[List[str], MetricCollection], - model_names: Optional[Union[str, List[str]]] = None, - transforms: Optional[ColumnTransformer] = None, - prediction_column_prefix: str = "predictions", - splits_mapping: dict = None, - slice_spec: Optional[SliceSpec] = None, - batch_size: int = config.DEFAULT_MAX_BATCH_SIZE, - remove_columns: Optional[Union[str, List[str]]] = None, - fairness_config: Optional[FairnessConfig] = None, - override_fairness_metrics: bool = False, - ) -> Dict[str, Any]: - """Evaluate model(s) on a HuggingFace dataset. - - Parameters - ---------- - dataset : Union[Dataset, DatasetDict] - HuggingFace dataset. - metrics : Union[List[str], MetricCollection] - Metrics to be evaluated. - model_names : Union[str, List[str]], optional - Model names to be evaluated, if not specified all fitted models \ - will be used for evaluation, by default None - transforms : Optional[ColumnTransformer], optional - Transformations to be applied to the data before prediction, \ - by default None - prediction_column_prefix : str, optional - Name of the prediction column to be added to \ - the dataset, by default "predictions" - splits_mapping: Optional[dict], optional - Mapping from 'train', 'validation' and 'test' to dataset splits names \ - used when input is a dataset dictionary, by default {"test": "test"} - slice_spec : Optional[SlicingConfig], optional - Specifications for creating a slices of a dataset, by default None - batch_size : int, optional - Batch size for batched prediction and evaluation, \ - by default config.DEFAULT_MAX_BATCH_SIZE - remove_columns : Optional[Union[str, List[str]]], optional - Unnecessary columns to be removed from the dataset, by default None - fairness_config : Optional[FairnessConfig], optional - The configuration for computing fairness metrics. If None, no fairness \ - metrics will be computed, by default None - override_fairness_metrics : bool, optional - If True, the `metrics` argument in fairness_config will be overridden by \ - the `metrics`, by default False - - Returns - ------- - Dict[str, Any] - Dictionary with evaluation results. - - """ - if splits_mapping is None: - splits_mapping = {"test": "test"} - if isinstance(metrics, list) and len(metrics): - metrics = [ - create_metric( - m, - task=self.task_type, - num_labels=len(self.task_features), - ) - for m in metrics - ] - metrics = MetricCollection(metrics) - - if isinstance(model_names, str): - model_names = [model_names] - elif not model_names: - model_names = self.pretrained_models + self.trained_models - - for model_name in model_names: - if model_name not in self.pretrained_models + self.trained_models: - LOGGER.warning( - "It seems you have neither trained the model nor \ - loaded a pretrained model.", - ) - - dataset = self.predict( - dataset, - model_name=model_name, - transforms=transforms, - prediction_column_prefix=prediction_column_prefix, - only_predictions=False, - splits_mapping=splits_mapping, - ) - - results = evaluate( - dataset=dataset, - metrics=metrics, - target_columns=self.task_target, - slice_spec=slice_spec, - prediction_column_prefix=prediction_column_prefix, - remove_columns=remove_columns, - split=splits_mapping["test"], - batch_size=batch_size, - fairness_config=fairness_config, - override_fairness_metrics=override_fairness_metrics, - ) - return results, dataset diff --git a/cyclops/tasks/utils.py b/cyclops/tasks/utils.py index a96209064..b09d652da 100644 --- a/cyclops/tasks/utils.py +++ b/cyclops/tasks/utils.py @@ -1,6 +1,6 @@ """Tasks utility functions.""" -from typing import Dict, List, Sequence, Union, get_args +from typing import Any, Callable, Dict, List, Sequence, Union, get_args import PIL from torchvision.transforms import PILToTensor @@ -31,7 +31,10 @@ ] -def apply_image_transforms(examples: Dict[str, List], transforms: callable) -> dict: +def apply_image_transforms( + examples: Dict[str, List[Any]], + transforms: Callable[[Any], Any], +) -> Dict[str, List[Any]]: """Apply transforms to examples. Used for applying image transformations to examples for chest X-ray classification. @@ -41,7 +44,7 @@ def apply_image_transforms(examples: Dict[str, List], transforms: callable) -> d # doing a conversion from PIL to tensor is necessary here when working # with the Image feature type. value_len = len(list(examples.values())[0]) - examples = [ + examples_ = [ { k: PILToTensor()(v[i]) if isinstance(v[i], PIL.Image.Image) else v[i] for k, v in examples.items() @@ -50,10 +53,10 @@ def apply_image_transforms(examples: Dict[str, List], transforms: callable) -> d ] # apply the transforms to each example - examples = [transforms(example) for example in examples] + examples_ = [transforms(example) for example in examples_] # convert back to a dict of lists - return {k: [d[k] for d in examples] for k in examples[0]} + return {k: [d[k] for d in examples_] for k in examples_[0]} def prepare_models( @@ -122,8 +125,8 @@ def prepare_models( # models contains a dictionary of model names and wrapped models elif isinstance(models, dict): assert all(isinstance(m, get_args(WrappedModel)) for m in models.values()) - models_dict = models + models_dict = models # type: ignore else: raise TypeError(f"Invalid model type: {type(models)}") - return models_dict + return models_dict # type: ignore diff --git a/tests/cyclops/tasks/test_cxr_classification.py b/tests/cyclops/tasks/test_cxr_classification.py deleted file mode 100644 index 40d95150e..000000000 --- a/tests/cyclops/tasks/test_cxr_classification.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Test MortalityPredictionTask.""" - -from unittest import TestCase - -from cyclops.models.catalog import create_model -from cyclops.tasks import CXRClassificationTask - - -class TestCXRClassificationTask(TestCase): - """Test CXRClassificationTask class.""" - - def setUp(self): - """Set up for testing.""" - self.model_name = "densenet" - self.model = create_model(self.model_name) - self.test_task = CXRClassificationTask( - {self.model_name: self.model}, - ) - - def test_init(self): - """Test initialization of CXRClassificationTask.""" - models_list = self.test_task.list_models() - assert models_list == [self.model_name] - - def test_add_model(self): - """Test adding a model to CXRClassificationTask.""" - self.test_task.add_model("resnet") - models_list = self.test_task.list_models() - assert models_list == [self.model_name, "resnet"] - - def test_get_model(self): - """Test getting a model from CXRClassificationTask.""" - model_name, model = self.test_task.get_model(self.model_name) - assert model_name == self.model_name - assert model == self.model diff --git a/tests/cyclops/tasks/test_mortality_prediction.py b/tests/cyclops/tasks/test_mortality_prediction.py deleted file mode 100644 index 12ef8164d..000000000 --- a/tests/cyclops/tasks/test_mortality_prediction.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Test MortalityPredictionTask.""" - -from unittest import TestCase - -from cyclops.models.catalog import create_model -from cyclops.tasks import MortalityPredictionTask - - -class TestMortalityPredictionTask(TestCase): - """Test MortalityPredictionTask class.""" - - def setUp(self): - """Set up for testing.""" - self.model_name = "mlp" - self.model = create_model(self.model_name) - self.test_task = MortalityPredictionTask( - {self.model_name: self.model}, - ) - - def test_init(self): - """Test initialization of MortalityPredictionTask.""" - models_list = self.test_task.list_models() - assert models_list == [self.model_name] - - def test_add_model(self): - """Test adding a model to MortalityPredictionTask.""" - self.test_task.add_model("rf_classifier") - models_list = self.test_task.list_models() - assert models_list == [self.model_name, "rf_classifier"] - - def test_get_model(self): - """Test getting a model from MortalityPredictionTask.""" - model_name, model = self.test_task.get_model(self.model_name) - assert model_name == self.model_name - assert model == self.model