diff --git a/luxonis_ml/data/augmentations/__init__.py b/luxonis_ml/data/augmentations/__init__.py new file mode 100644 index 00000000..c771e20b --- /dev/null +++ b/luxonis_ml/data/augmentations/__init__.py @@ -0,0 +1,11 @@ +from .custom import LetterboxResize, MixUp, Mosaic4 +from .utils import Augmentations, TrainAugmentations, ValAugmentations + +__all__ = [ + "Augmentations", + "TrainAugmentations", + "ValAugmentations", + "LetterboxResize", + "MixUp", + "Mosaic4", +] diff --git a/luxonis_ml/data/augmentations/batch_compose.py b/luxonis_ml/data/augmentations/batch_compose.py new file mode 100644 index 00000000..d289f6e4 --- /dev/null +++ b/luxonis_ml/data/augmentations/batch_compose.py @@ -0,0 +1,380 @@ +import random +from typing import Any, Dict, List, Optional, Union, cast + +import numpy as np +from albumentations.core.bbox_utils import ( + BboxParams, + BboxProcessor, + DataProcessor, +) +from albumentations.core.composition import ( + BaseCompose, + TransformsSeqType, + get_always_apply, +) +from albumentations.core.keypoints_utils import KeypointParams, KeypointsProcessor +from albumentations.core.utils import get_shape + +from .batch_processors import BboxBatchProcessor, KeypointsBatchProcessor +from .batch_utils import batch2list, concat_batches, list2batch, to_unbatched_name + + +class Compose(BaseCompose): + def __init__( + self, + transforms: TransformsSeqType, + bbox_params: Optional[Union[dict, BboxParams]] = None, + keypoint_params: Optional[Union[dict, KeypointParams]] = None, + additional_targets: Optional[Dict[str, str]] = None, + p: float = 1.0, + is_check_shapes: bool = True, + ): + """Compose transforms and handle all transformations regarding bounding boxes. + + @param transforms: List of transformations to compose + @type transforms: TransformsSeqType + @param bboxparams: Parameters for bounding boxes transforms. Defaults to None. + @type bboxparams: Optional[Union[dict, BboxParams]] + @param keypoint_params: Parameters for keypoint transforms. Defaults to None. + @type keypoint_params: Optional[Union[dict, KeypointParams]] + @param additional_targets: Dict with keys - new target name, values - old target + name. ex: {'image2': 'image'}. Defaults to None. + @type additional_targets: Optional[Dict[str, str]] + @param p: Probability of applying all list of transforms. Defaults to 1.0. + @type p: float + @param is_check_shapes: If True shapes consistency of images/mask/masks would be checked on + each call. If you would like to disable this check - pass False (do it only if you are sure + in your data consistency). Defaults to True. + @type is_check_shapes: bool + """ + super(Compose, self).__init__(transforms, p) + + self.processors: Dict[str, DataProcessor] = {} + + if bbox_params: + if isinstance(bbox_params, dict): + b_params = BboxParams(**bbox_params) + elif isinstance(bbox_params, BboxParams): + b_params = bbox_params + else: + raise ValueError( + "unknown format of bbox_params, please use `dict` or `BboxParams`" + ) + self.processors["bboxes"] = self._get_bbox_processor( + b_params, additional_targets + ) + + if keypoint_params: + if isinstance(keypoint_params, dict): + k_params = KeypointParams(**keypoint_params) + elif isinstance(keypoint_params, KeypointParams): + k_params = keypoint_params + else: + raise ValueError( + "unknown format of keypoint_params, please use `dict` or `KeypointParams`" + ) + self.processors["keypoints"] = self._get_keypoints_processor( + k_params, additional_targets + ) + + if additional_targets is None: + additional_targets = {} + + self.additional_targets = additional_targets + + for proc in self.processors.values(): + proc.ensure_transforms_valid(self.transforms) + + self.add_targets(additional_targets) + + self.is_check_args = True + self._disable_check_args_for_transforms(self.transforms) + + self.is_check_shapes = is_check_shapes + + def _get_bbox_processor(self, b_params, additional_targets): + return BboxProcessor(b_params, additional_targets) + + def _get_keypoints_processor(self, k_params, additional_targets): + return KeypointsProcessor(k_params, additional_targets) + + @staticmethod + def _disable_check_args_for_transforms(transforms: TransformsSeqType) -> None: + for transform in transforms: + if isinstance(transform, BaseCompose): + Compose._disable_check_args_for_transforms(transform.transforms) + if isinstance(transform, Compose): + transform._disable_check_args() + + def _disable_check_args(self) -> None: + self.is_check_args = False + + def __call__(self, *args, force_apply: bool = False, **data) -> Dict[str, Any]: + if args: + raise KeyError( + "You have to pass data to augmentations as named arguments, for example: aug(image=image)" + ) + if self.is_check_args: + self._check_args(self.additional_targets, **data) + assert isinstance( + force_apply, (bool, int) + ), "force_apply must have bool or int type" + need_to_run = force_apply or random.random() < self.p + for p in self.processors.values(): + p.ensure_data_valid(data) + transforms = ( + self.transforms if need_to_run else get_always_apply(self.transforms) + ) + + check_each_transform = any( + getattr(item.params, "check_each_transform", False) + for item in self.processors.values() + ) + + for p in self.processors.values(): + p.preprocess(data) + + for _, t in enumerate(transforms): + data = t(**data) + if check_each_transform: + data = self._check_data_post_transform(data) + + data = self._make_targets_contiguous( + data + ) # ensure output targets are contiguous + + for p in self.processors.values(): + p.postprocess(data) + + return data + + def _check_data_post_transform(self, data: Dict[str, Any]) -> Dict[str, Any]: + rows, cols = get_shape(data["image"]) + + for p in self.processors.values(): + if not getattr(p.params, "check_each_transform", False): + continue + + for data_name in p.data_fields: + data[data_name] = p.filter(data[data_name], rows, cols) + return data + + def _to_dict(self) -> Dict[str, Any]: + dictionary = super(Compose, self)._to_dict() + bbox_processor = self.processors.get("bboxes") + keypoints_processor = self.processors.get("keypoints") + dictionary.update( + { + "bbox_params": bbox_processor.params._to_dict() + if bbox_processor + else None, # skipcq: PYL-W0212 + "keypoint_params": keypoints_processor.params._to_dict() # skipcq: PYL-W0212 + if keypoints_processor + else None, + "additional_targets": self.additional_targets, + "is_check_shapes": self.is_check_shapes, + } + ) + return dictionary + + def get_dict_with_id(self) -> Dict[str, Any]: + dictionary = super().get_dict_with_id() + bbox_processor = self.processors.get("bboxes") + keypoints_processor = self.processors.get("keypoints") + dictionary.update( + { + "bbox_params": bbox_processor.params._to_dict() + if bbox_processor + else None, # skipcq: PYL-W0212 + "keypoint_params": keypoints_processor.params._to_dict() # skipcq: PYL-W0212 + if keypoints_processor + else None, + "additional_targets": self.additional_targets, + "params": None, + "is_check_shapes": self.is_check_shapes, + } + ) + return dictionary + + def _check_args(self, additional_targets, **kwargs) -> None: + checked_single = ["image", "mask"] + checked_multi = ["masks"] + check_bbox_param = ["bboxes"] + # ["bboxes", "keypoints"] could be almost any type, no need to check them + shapes = [] + for data_name, data in kwargs.items(): + internal_data_name = additional_targets.get(data_name, data_name) + if internal_data_name in checked_single: + if not isinstance(data, np.ndarray): + raise TypeError("{} must be numpy array type".format(data_name)) + shapes.append(data.shape[:2]) + if internal_data_name in checked_multi: + if data is not None: + if not isinstance(data[0], np.ndarray): + raise TypeError( + "{} must be list of numpy arrays".format(data_name) + ) + shapes.append(data[0].shape[:2]) + if ( + internal_data_name in check_bbox_param + and self.processors.get("bboxes") is None + ): + raise ValueError( + "bbox_params must be specified for bbox transformations" + ) + + if self.is_check_shapes and shapes and shapes.count(shapes[0]) != len(shapes): + raise ValueError( + "Height and Width of image, mask or masks should be equal. You can disable shapes check " + "by setting a parameter is_check_shapes=False of Compose class (do it only if you are sure " + "about your data consistency)." + ) + + def _make_targets_contiguous(self, data: Dict[str, Any]) -> Dict[str, Any]: + result = {} + for key, value in data.items(): + if isinstance(value, np.ndarray): + value = np.ascontiguousarray(value) + result[key] = value + return result + + +class BatchCompose(Compose): + def __init__( + self, + transforms: TransformsSeqType, + bbox_params: Optional[Union[dict, BboxParams]] = None, + keypoint_params: Optional[Union[dict, KeypointParams]] = None, + additional_targets: Optional[Dict[str, str]] = None, + p: float = 1.0, + is_check_shapes: bool = True, + ): + """Compose designed to handle the multi-image transforms The contents can be a + subclass of `BatchBasedTransform` or other transforms enclosed by ForEach + container. All targets' names should have the suffix "_batch", ex + ("image_batch", "bboxes_batch"). Note this nameing rule is applied to the + `label_fields` of the `BboxParams` and the `KeypointsParams`. + + @param transforms: List of transformations to compose + @type transforms: TransformsSeqType + @param bboxparams: Parameters for bounding boxes transforms. Defaults to None. + @type bboxparams: Optional[Union[dict, BboxParams]] + @param keypoint_params: Parameters for keypoint transforms. Defaults to None. + @type keypoint_params: Optional[Union[dict, KeypointParams]] + @param additional_targets: Dict with keys - new target name, values - old target + name. ex: {'image2': 'image'}. Defaults to None. + @type additional_targets: Optional[Dict[str, str]] + @param p: Probability of applying all list of transforms. Defaults to 1.0. + @type p: float + @param is_check_shapes: If True shapes consistency of images/mask/masks would be checked on + each call. If you would like to disable this check - pass False (do it only if you are sure + in your data consistency). Defaults to True. + @type is_check_shapes: bool + """ + super(BatchCompose, self).__init__( + transforms=transforms, + bbox_params=bbox_params, + keypoint_params=keypoint_params, + additional_targets=additional_targets, + p=p, + is_check_shapes=is_check_shapes, + ) + + def _get_bbox_processor(self, b_params, additional_targets): + return BboxBatchProcessor(b_params, additional_targets) + + def _get_keypoints_processor(self, k_params, additional_targets): + return KeypointsBatchProcessor(k_params, additional_targets) + + def _check_data_post_transform( + self, batched_data: Dict[str, Any] + ) -> Dict[str, Any]: + datalist = batch2list(batched_data) + processed = [] + for data in datalist: + rows, cols = get_shape(data["image"]) + for p in self.processors.values(): + if not getattr(p.params, "check_each_transform", False): + continue + p = cast(Union[BboxBatchProcessor, KeypointsBatchProcessor], p) + for data_name in p.item_processor.data_fields: + data[data_name] = p.filter(data[data_name], rows, cols) + processed.append(data) + return list2batch(processed) + + def _check_args(self, additional_targets, **kwargs) -> None: + datalist = batch2list(kwargs) + unbatched_targets = { + to_unbatched_name(k): to_unbatched_name(v) + for k, v in additional_targets.items() + } + for data in datalist: + super(BatchCompose, self)._check_args(unbatched_targets, **data) + + def _make_targets_contiguous(self, batched_data: Dict[str, Any]) -> Dict[str, Any]: + datalist = batch2list(batched_data) + if len(datalist) == 0: + return batched_data + processed = [] + for data in datalist: + data = super(BatchCompose, self)._make_targets_contiguous(data) + processed.append(data) + return list2batch(processed) + + +class ForEach(BaseCompose): + """Apply transforms for each batch element This expects batched input and can be + contained by the `BatchCompose`.""" + + def __init__(self, transforms: TransformsSeqType, p: float = 0.5): + super().__init__(transforms, p) + + def __call__( + self, *args, force_apply: bool = False, **batched_data + ) -> Dict[str, List]: + datalist = batch2list(batched_data) + processed = [] + for data in datalist: + for t in self.transforms: + data = t(force_apply=force_apply, **data) + processed.append(data) + batched_data = list2batch(processed) + return batched_data + + def add_targets(self, additional_targets: Optional[Dict[str, str]]) -> None: + if additional_targets: + unbatched_targets = { + to_unbatched_name(k): to_unbatched_name(v) + for k, v in additional_targets.items() + } + for t in self.transforms: + t.add_targets(unbatched_targets) + + +class Repeat(BaseCompose): + """Apply transforms repeatedly and concatenates the output batches. + + This expects batched input and can be contained by the `BatchCompose`. + The contained transforms should be a subbclass of the `BatchBasedTransform`. + Internally, this container works as the following way: + Note: This class assumes that each transform does not modify the input data. + """ + + def __init__(self, transforms: TransformsSeqType, n: int, p: float = 0.5): + super().__init__(transforms, p) + if n <= 0: + raise ValueError("Repetition `n` should be larger than 0") + self.n = n + + def __call__( + self, *args, force_apply: bool = False, **batched_data + ) -> Dict[str, List]: + processed = [] + for _ in range(self.n): + image = batched_data["image_batch"][0].copy() + data = batched_data + for t in self.transforms: + data = t(force_apply=force_apply, **data) + processed.append(data) + assert np.all(batched_data["image_batch"][0] == image) + return concat_batches(processed) diff --git a/luxonis_ml/data/augmentations/batch_processors.py b/luxonis_ml/data/augmentations/batch_processors.py new file mode 100644 index 00000000..88ffa756 --- /dev/null +++ b/luxonis_ml/data/augmentations/batch_processors.py @@ -0,0 +1,154 @@ +import copy +from typing import Any, Dict, Optional, Sequence + +from albumentations.core.bbox_utils import BboxParams, BboxProcessor +from albumentations.core.keypoints_utils import KeypointParams, KeypointsProcessor +from albumentations.core.utils import DataProcessor + +from .batch_utils import batch2list, list2batch, to_unbatched_name + + +class BboxBatchProcessor(DataProcessor): + def __init__( + self, params: BboxParams, additional_targets: Optional[Dict[str, str]] = None + ): + """Data processor class to process bbox data in batches. + + @param params: Bbox parameters + @type params: BboxParams + @param additional_targets: Additional targets of the transform. Defaults to + None. + @type additional_targets: Optional[Dict[str, str]] + """ + super().__init__(params, additional_targets) + item_params = copy.deepcopy(params) + if item_params.label_fields is not None: + label_fields = item_params.label_fields + item_params.label_fields = [ + to_unbatched_name(field) for field in label_fields + ] + self.item_processor = BboxProcessor(item_params, additional_targets) + + @property + def default_data_name(self) -> str: + return "bboxes_batch" + + def ensure_data_valid(self, data: Dict[str, Any]) -> None: + for item in batch2list(data): + self.item_processor.ensure_data_valid(item) + + def postprocess(self, data: Dict[str, Any]) -> Dict[str, Any]: + processed = [self.item_processor.postprocess(item) for item in batch2list(data)] + procesed_data = list2batch(processed) + for k in data.keys(): + data[k] = procesed_data[k] + return data + + def preprocess(self, data: Dict[str, Any]) -> None: + processed = batch2list(data) + for item in processed: + self.item_processor.preprocess(item) + procesed_data = list2batch(processed) + for k in data.keys(): + data[k] = procesed_data[k] + + def filter_batch(self, batched_data: Dict[str, Any]) -> Dict[str, Any]: + processed = [] + for data in batch2list(batched_data): + rows, cols = data["image"][:2] + for data_name in self.item_processor.data_fields: + data[data_name] = self.item_processor.filter( + data[data_name], rows, cols + ) + processed.append(data) + return list2batch(processed) + + def filter(self, data: Sequence, rows: int, cols: int) -> Sequence: + return self.item_processor.filter(data, rows, cols) + + def check(self, data: Sequence, rows: int, cols: int) -> None: + return self.item_processor.check(data, rows, cols) + + def convert_to_albumentations( + self, data: Sequence, rows: int, cols: int + ) -> Sequence: + return self.item_processor.convert_to_albumentations(data, rows, cols) + + def convert_from_albumentations( + self, data: Sequence, rows: int, cols: int + ) -> Sequence: + return self.item_processor.convert_from_albumentations(data, rows, cols) + + +class KeypointsBatchProcessor(DataProcessor): + def __init__( + self, + params: KeypointParams, + additional_targets: Optional[Dict[str, str]] = None, + ): + """Data processor class to process keypoint data in batches. + + @param params: Keypoint parameters + @type params: KeypointParams + @param additional_targets: Additional targets of the transform. Defaults to + None. + @type additional_targets: Optional[Dict[str, str]] + """ + super().__init__(params, additional_targets) + item_params = copy.deepcopy(params) + if item_params.label_fields is not None: + label_fields = item_params.label_fields + item_params.label_fields = [ + to_unbatched_name(field) for field in label_fields + ] + self.item_processor = KeypointsProcessor(item_params, additional_targets) + + @property + def default_data_name(self) -> str: + return "keypoints_batch" + + def ensure_data_valid(self, data: Dict[str, Any]) -> None: + for item in batch2list(data): + self.item_processor.ensure_data_valid(item) + + def postprocess(self, data: Dict[str, Any]) -> Dict[str, Any]: + processed = [self.item_processor.postprocess(item) for item in batch2list(data)] + procesed_data = list2batch(processed) + for k in data.keys(): + data[k] = procesed_data[k] + return data + + def preprocess(self, data: Dict[str, Any]) -> None: + processed = batch2list(data) + for item in processed: + self.item_processor.preprocess(item) + procesed_data = list2batch(processed) + for k in data.keys(): + data[k] = procesed_data[k] + + def filter_batch(self, batched_data: Dict[str, Any]) -> Dict[str, Any]: + processed = [] + for data in batch2list(batched_data): + rows, cols = data["image"][:2] + for data_name in self.item_processor.data_fields: + data[data_name] = self.item_processor.filter( + data[data_name], rows, cols + ) + processed.append(data) + return list2batch(processed) + + def filter(self, data: Sequence, rows: int, cols: int) -> Sequence: + return self.item_processor.filter(data, rows, cols) + + def check(self, data: Sequence, rows: int, cols: int) -> None: + return self.item_processor.check(data, rows, cols) + + def convert_to_albumentations( + self, data: Sequence, rows: int, cols: int + ) -> Sequence: + return self.item_processor.convert_to_albumentations(data, rows, cols) + + def convert_from_albumentations( + self, data: Sequence, rows: int, cols: int + ) -> Sequence: + return self.item_processor.convert_from_albumentations(data, rows, cols) diff --git a/luxonis_ml/data/augmentations/batch_transform.py b/luxonis_ml/data/augmentations/batch_transform.py new file mode 100644 index 00000000..a5dd32ed --- /dev/null +++ b/luxonis_ml/data/augmentations/batch_transform.py @@ -0,0 +1,67 @@ +from typing import Any, Callable, Dict, List, Sequence + +import numpy as np +from albumentations.core.transforms_interface import ( + BasicTransform, + BoxType, + KeypointType, +) + + +class BatchBasedTransform(BasicTransform): + def __init__(self, batch_size: int, **kwargs): + """Transform for multi-image. + + @param batch_size: Batch size needed for augmentation to work + @type batch_size: int + @param kwargs: Additional BasicTransform parameters + @type kwargs: Any + """ + super().__init__(**kwargs) + + self.batch_size = batch_size + + @property + def targets(self) -> Dict[str, Callable]: + return { + "image_batch": self.apply_to_image_batch, + "mask_batch": self.apply_to_mask_batch, + "bboxes_batch": self.apply_to_bboxes_batch, + "keypoints_batch": self.apply_to_keypoints_batch, + } + + def update_params(self, params: Dict[str, Any], **kwargs) -> Dict[str, Any]: + # This overwrites the `super().update_params(...)` + return params + + def apply_to_image_batch( + self, image_batch: Sequence[BoxType], **params + ) -> List[np.ndarray]: + raise NotImplementedError( + "Method apply_to_image_batch is not implemented in class " + + self.__class__.__name__ + ) + + def apply_to_mask_batch( + self, mask_batch: Sequence[BoxType], **params + ) -> List[np.ndarray]: + raise NotImplementedError( + "Method apply_to_mask_batch is not implemented in class " + + self.__class__.__name__ + ) + + def apply_to_bboxes_batch( + self, bboxes_batch: Sequence[BoxType], **params + ) -> List[BoxType]: + raise NotImplementedError( + "Method apply_to_bboxes_batch is not implemented in class " + + self.__class__.__name__ + ) + + def apply_to_keypoints_batch( + self, keypoints_batch: Sequence[BoxType], **params + ) -> List[KeypointType]: + raise NotImplementedError( + "Method apply_to_keypoints_batch is not implemented in class " + + self.__class__.__name__ + ) diff --git a/luxonis_ml/data/augmentations/batch_utils.py b/luxonis_ml/data/augmentations/batch_utils.py new file mode 100644 index 00000000..126d1c28 --- /dev/null +++ b/luxonis_ml/data/augmentations/batch_utils.py @@ -0,0 +1,73 @@ +from typing import Any, Dict, List + + +def batch2list(data: Dict[str, List]) -> List[Dict[str, Any]]: + """Convert from a batched target dict to list of normal target dicts.""" + if "image_batch" not in data: + raise ValueError("Batch-based transform should have `image_batch` target") + batch_size = len(data["image_batch"]) + items = [] + for i in range(batch_size): + item = {} + for k, v in data.items(): + if k.endswith("_batch"): + # ex. {"image_batch": image_batch} -> {"image": image_batch[i]} + item_k = to_unbatched_name(k) + item[item_k] = v[i] + else: + raise ValueError(f"All key must have '_batch' suffix, got `{k}`") + items.append(item) + return items + + +def list2batch(data: List[Dict[str, Any]]) -> Dict[str, List]: + """Convert from a list of normal target dicts to a batched target dict.""" + + if len(data) == 0: + raise ValueError("The input should have at least one item.") + + item = data[0] + batch: Dict[str, Any] = {f"{k}_batch": [] for k in item.keys()} + for item in data: + for k, v in item.items(): + batch_k = to_batched_name(k) + batch[batch_k].append(v) + + return batch + + +def to_unbatched_name(batched_name: str) -> str: + """Get a normal target name from a batched target name If the given name does not + have "_batched" suffix, ValueError will be raised.""" + if not batched_name.endswith("_batch"): + raise ValueError( + f"Batched target name must have '_batch' suffix, got `{batched_name}`" + ) + return batched_name.replace("_batch", "") + + +def to_batched_name(name: str) -> str: + """Get a unbatched target name from a normal target name If the given name already + has had "_batched" suffix, ValueError will be raised.""" + + if name.endswith("_batch"): + raise ValueError( + f"Non batched target name must not have '_batch' suffix, got `{name}`" + ) + return f"{name}_batch" + + +def concat_batches(batches: List[Dict[str, List]]) -> Dict[str, List]: + """Concatenate batched targets.""" + + n_batches = len(batches) + if n_batches == 0: + raise ValueError("The input should have at least one item.") + + keys = list(batches[0].keys()) + out_batch: Dict[str, List] = {k: [] for k in keys} + for batch in batches: + for k in keys: + for item in batch[k]: + out_batch[k].append(item) + return out_batch diff --git a/luxonis_ml/data/augmentations/custom/__init__.py b/luxonis_ml/data/augmentations/custom/__init__.py new file mode 100644 index 00000000..29563f03 --- /dev/null +++ b/luxonis_ml/data/augmentations/custom/__init__.py @@ -0,0 +1,5 @@ +from .letterbox_resize import LetterboxResize +from .mixup import MixUp +from .mosaic import Mosaic4 + +__all__ = ["LetterboxResize", "MixUp", "Mosaic4"] diff --git a/luxonis_ml/data/augmentations/custom/letterbox_resize.py b/luxonis_ml/data/augmentations/custom/letterbox_resize.py new file mode 100644 index 00000000..ccdad265 --- /dev/null +++ b/luxonis_ml/data/augmentations/custom/letterbox_resize.py @@ -0,0 +1,289 @@ +from typing import Any, Dict, Tuple + +import cv2 +import numpy as np +from albumentations import BoxType, DualTransform, KeypointType +from albumentations.core.bbox_utils import denormalize_bbox, normalize_bbox + +from ..utils import AUGMENTATIONS + + +@AUGMENTATIONS.register_module() +class LetterboxResize(DualTransform): + def __init__( + self, + height: int, + width: int, + interpolation: int = cv2.INTER_LINEAR, + border_value: int = 0, + mask_value: int = 0, + always_apply: bool = False, + p: float = 1.0, + ): + """Augmentation to apply letterbox resizing to images. Also transforms masks, + bboxes and keypoints to correct shape. + + @param height: Desired height of the output. + @type height: int + @param width: Desired width of the output. + @type width: int + @param interpolation: Cv2 flag to specify interpolation used when resizing. + Defaults to cv2.INTER_LINEAR. + @type interpolation: int, optional + @param border_value: Padding value for images. Defaults to 0. + @type border_value: int, optional + @param mask_value: Padding value for masks. Defaults to 0. + @type mask_value: int, optional + @param always_apply: Whether to always apply the transform. Defaults to False. + @type always_apply: bool, optional + @param p: Probability of applying the transform. Defaults to 1.0. + @type p: float, optional + """ + + super().__init__(always_apply, p) + + if not (0 <= border_value <= 255): + raise ValueError("Border value must be in range [0,255].") + + if not (0 <= mask_value <= 255): + raise ValueError("Mask value must be in range [0,255].") + + self.height = height + self.width = width + self.interpolation = interpolation + self.border_value = border_value + self.mask_value = mask_value + + def update_params(self, params: Dict[str, Any], **kwargs) -> Dict[str, Any]: + """Updates augmentation parameters with the necessary metadata. + + @param params: The existing augmentation parameters dictionary. + @type params: Dict[str, Any] + @param kwargs: Additional keyword arguments to add the parameters. + @type kwargs: Any + @return: Updated dictionary containing the merged parameters. + @rtype: Dict[str, Any] + """ + + params = super().update_params(params, **kwargs) + + img_height = params["rows"] + img_width = params["cols"] + + ratio = min(self.height / img_height, self.width / img_width) + new_height = int(img_height * ratio) + new_width = int(img_width * ratio) + + # only supports center alignment + pad_top = (self.height - new_height) // 2 + pad_bottom = pad_top + + pad_left = (self.width - new_width) // 2 + pad_right = pad_left + + params.update( + { + "pad_top": pad_top, + "pad_bottom": pad_bottom, + "pad_left": pad_left, + "pad_right": pad_right, + } + ) + + return params + + def apply( + self, + img: np.ndarray, + pad_top: int, + pad_bottom: int, + pad_left: int, + pad_right: int, + **kwargs, + ) -> np.ndarray: + """Applies the letterbox augmentation to an image. + + @param img: Input image to which resize is applied. + @type img: np.ndarray + @param pad_top: Number of pixels to pad at the top. + @type pad_top: int + @param pad_bottom: Number of pixels to pad at the bottom. + @type pad_bottom: int + @param pad_left: Number of pixels to pad on the left. + @type pad_left: int + @param pad_right: Number of pixels to pad on the right. + @type pad_right: int + @param params: Additional parameters for the padding operation. + @type params: Any + @return: Image with applied letterbox resize. + @rtype: np.ndarray + """ + + resized_img = cv2.resize( + img, + (self.width - pad_left - pad_right, self.height - pad_top - pad_bottom), + interpolation=self.interpolation, + ) + img_out = cv2.copyMakeBorder( + resized_img, + pad_top, + pad_bottom, + pad_left, + pad_right, + cv2.BORDER_CONSTANT, + self.border_value, + ) + img_out = img_out.astype(img.dtype) + return img_out + + def apply_to_mask( + self, + img: np.ndarray, + pad_top: int, + pad_bottom: int, + pad_left: int, + pad_right: int, + **params, + ) -> np.ndarray: + """Applies letterbox augmentation to the input mask. + + @param img: Input mask to which resize is applied. + @type img: np.ndarray + @param pad_top: Number of pixels to pad at the top. + @type pad_top: int + @param pad_bottom: Number of pixels to pad at the bottom. + @type pad_bottom: int + @param pad_left: Number of pixels to pad on the left. + @type pad_left: int + @param pad_right: Number of pixels to pad on the right. + @type pad_right: int + @param params: Additional parameters for the padding operation. + @type params: Any + @return: Mask with applied letterbox resize. + @rtype: np.ndarray + """ + + resized_img = cv2.resize( + img, + (self.width - pad_left - pad_right, self.height - pad_top - pad_bottom), + interpolation=cv2.INTER_NEAREST, + ) + img_out = cv2.copyMakeBorder( + resized_img, + pad_top, + pad_bottom, + pad_left, + pad_right, + cv2.BORDER_CONSTANT, + self.mask_value, + ) + img_out = img_out.astype(img.dtype) + return img_out + + def apply_to_bbox( + self, + bbox: BoxType, + pad_top: int, + pad_bottom: int, + pad_left: int, + pad_right: int, + **params, + ) -> BoxType: + """Applies letterbox augmentation to the bounding box. + + @param img: Bounding box to which resize is applied. + @type img: BoxType + @param pad_top: Number of pixels to pad at the top. + @type pad_top: int + @param pad_bottom: Number of pixels to pad at the bottom. + @type pad_bottom: int + @param pad_left: Number of pixels to pad on the left. + @type pad_left: int + @param pad_right: Number of pixels to pad on the right. + @type pad_right: int + @param params: Additional parameters for the padding operation. + @type params: Any + @return: Bounding box with applied letterbox resize. + @rtype: BoxType + """ + + x_min, y_min, x_max, y_max = denormalize_bbox( + bbox, self.height - pad_top - pad_bottom, self.width - pad_left - pad_right + )[:4] + bbox = np.array( + [x_min + pad_left, y_min + pad_top, x_max + pad_left, y_max + pad_top] + ) + # clip bbox to image, ignoring padding + bbox = bbox.clip( + min=[pad_left, pad_top] * 2, + max=[self.width - pad_left, self.height - pad_top] * 2, + ).tolist() + return normalize_bbox(bbox, self.height, self.width) + + def apply_to_keypoint( + self, + keypoint: KeypointType, + pad_top: int, + pad_bottom: int, + pad_left: int, + pad_right: int, + **params, + ) -> KeypointType: + """Applies letterbox augmentation to the keypoint. + + @param img: Keypoint to which resize is applied. + @type img: KeypointType + @param pad_top: Number of pixels to pad at the top. + @type pad_top: int + @param pad_bottom: Number of pixels to pad at the bottom. + @type pad_bottom: int + @param pad_left: Number of pixels to pad on the left. + @type pad_left: int + @param pad_right: Number of pixels to pad on the right. + @type pad_right: int + @param params: Additional parameters for the padding operation. + @type params: Any + @return: Keypoint with applied letterbox resize. + @rtype: KeypointType + """ + + x, y, angle, scale = keypoint[:4] + scale_x = (self.width - pad_left - pad_right) / params["cols"] + scale_y = (self.height - pad_top - pad_bottom) / params["rows"] + new_x = (x * scale_x) + pad_left + new_y = (y * scale_y) + pad_top + # if keypoint is in the padding then set coordinates to -1 + out_keypoint = ( + new_x + if not self._out_of_bounds(new_x, pad_left, self.width - pad_left) + else -1, + new_y + if not self._out_of_bounds(new_y, pad_top, self.height - pad_top) + else -1, + angle, + scale * max(scale_x, scale_y), + ) + return out_keypoint + + def get_transform_init_args_names(self) -> Tuple[str, ...]: + """Gets the default arguments for the letterbox augmentation. + + @return: The string keywords of the arguments. + @rtype: Tuple[str, ...] + """ + + return ("height", "width", "interpolation", "border_value", "mask_value") + + def _out_of_bounds(self, value: float, min_limit: float, max_limit: float) -> bool: + """ "Check if the given value is outside the specified limits. + + @param value: The value to be checked. + @type value: float + @param min_limit: Minimum limit. + @type min_limit: float + @param max_limit: Maximum limit. + @type max_limit: float + @return: True if the value is outside the specified limits, False otherwise. + @rtype: bool + """ + return value < min_limit or value > max_limit diff --git a/luxonis_ml/data/augmentations/custom/mixup.py b/luxonis_ml/data/augmentations/custom/mixup.py new file mode 100644 index 00000000..b788c705 --- /dev/null +++ b/luxonis_ml/data/augmentations/custom/mixup.py @@ -0,0 +1,170 @@ +import random +from typing import Any, Dict, List, Tuple, Union + +import albumentations as A +import cv2 +import numpy as np +from albumentations import BoxType, KeypointType + +from ..batch_transform import BatchBasedTransform +from ..utils import AUGMENTATIONS + + +@AUGMENTATIONS.register_module() +class MixUp(BatchBasedTransform): + def __init__( + self, + alpha: Union[float, Tuple[float, float]] = 0.5, + out_batch_size: int = 1, + always_apply: bool = False, + p: float = 0.5, + ): + """MixUp augmentation that merges two images and their annotations into one. If + images are not of same size then second one is first resized to match the first + one. + + @param alpha: Mixing coefficient, either a single float or a tuple representing + the range. Defaults to 0.5. + @type alpha: Union[float, Tuple[float, float]], optional + @param out_batch_size: Number of output images in the batch. Defaults to 1. + @type out_batch_size: int, optional + @param always_apply: Whether to always apply the transform. Defaults to False. + @type always_apply: bool, optional + @param p: Probability of applying the transform. Defaults to 0.5. + @type p: float, optional + """ + super().__init__(batch_size=2, always_apply=always_apply, p=p) + + self.alpha = alpha + self.out_batch_size = out_batch_size + + def get_transform_init_args_names(self) -> Tuple[str, ...]: + """Gets the default arguments for the mixup augmentation. + + @return: The string keywords of the arguments. + @rtype: Tuple[str, ...] + """ + return ("alpha", "out_batch_size") + + @property + def targets_as_params(self) -> List[str]: + """List of augmentation targets. + + @return: Output list of augmentation targets. + @rtype: List[str] + """ + return ["image_batch"] + + def apply_to_image_batch( + self, + image_batch: List[np.ndarray], + image_shapes: List[Tuple[int, int]], + **params, + ) -> List[np.ndarray]: + """Applies the transformation to a batch of images. + + @param image_batch: Batch of input images to which the transformation is + applied. + @type image_batch: List[np.ndarray] + @param image_shapes: Shapes of the input images in the batch. + @type image_shapes: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed images. + @rtype: List[np.ndarray] + """ + image1 = image_batch[0] + # resize second image to size of the first one + image2 = cv2.resize(image_batch[1], (image_shapes[0][1], image_shapes[0][0])) + + if isinstance(self.alpha, float): + curr_alpha = np.clip(self.alpha, 0, 1) + else: + curr_alpha = random.uniform(max(self.alpha[0], 0), min(self.alpha[1], 1)) + img_out = cv2.addWeighted(image1, curr_alpha, image2, 1 - curr_alpha, 0.0) + return [img_out] + + def apply_to_mask_batch( + self, + mask_batch: List[np.ndarray], + image_shapes: List[Tuple[int, int]], + **params, + ) -> List[np.ndarray]: + """Applies the transformation to a batch of masks. + + @param image_batch: Batch of input masks to which the transformation is applied. + @type image_batch: List[np.ndarray] + @param image_shapes: Shapes of the input images in the batch. + @type image_shapes: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed masks. + @rtype: List[np.ndarray] + """ + mask1 = mask_batch[0] + mask2 = cv2.resize( + mask_batch[1], + (image_shapes[0][1], image_shapes[0][0]), + interpolation=cv2.INTER_NEAREST, + ) + out_mask = mask1 + mask2 + # if masks intersect keep one present in first image + mask_inter = mask1 > 0 + out_mask[mask_inter] = mask1[mask_inter] + return [out_mask] + + def apply_to_bboxes_batch( + self, bboxes_batch: List[BoxType], image_shapes: List[Tuple[int, int]], **params + ) -> List[BoxType]: + """Applies the transformation to a batch of bboxes. + + @param image_batch: Batch of input bboxes to which the transformation is + applied. + @type image_batch: List[BoxType] + @param image_shapes: Shapes of the input images in the batch. + @type image_shapes: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed bboxes. + @rtype: List[BoxType] + """ + return [bboxes_batch[0] + bboxes_batch[1]] + + def apply_to_keypoints_batch( + self, + keypoints_batch: List[KeypointType], + image_shapes: List[Tuple[int, int]], + **params, + ) -> List[KeypointType]: + """Applies the transformation to a batch of keypoints. + + @param image_batch: Batch of input keypoints to which the transformation is + applied. + @type image_batch: List[BoxType] + @param image_shapes: Shapes of the input images in the batch. + @type image_shapes: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed keypoints. + @rtype: List[BoxType] + """ + scaled_kpts2 = [] + scale_x = image_shapes[0][1] / image_shapes[1][1] + scale_y = image_shapes[0][0] / image_shapes[1][0] + for kpt in keypoints_batch[1]: + new_kpt = A.augmentations.geometric.functional.keypoint_scale( + keypoint=kpt, scale_x=scale_x, scale_y=scale_y + ) + scaled_kpts2.append(new_kpt + kpt[4:]) + return [keypoints_batch[0] + scaled_kpts2] + + def get_params_dependent_on_targets(self, params: Dict[str, Any]) -> Dict[str, Any]: + """Get parameters dependent on the targets. + + @param params: Dictionary containing parameters. + @type params: Dict[str, Any] + @return: Dictionary containing parameters dependent on the targets. + @rtype: Dict[str, Any] + """ + image_batch = params["image_batch"] + return {"image_shapes": [image.shape[:2] for image in image_batch]} diff --git a/luxonis_ml/data/augmentations/custom/mosaic.py b/luxonis_ml/data/augmentations/custom/mosaic.py new file mode 100644 index 00000000..62a5a6eb --- /dev/null +++ b/luxonis_ml/data/augmentations/custom/mosaic.py @@ -0,0 +1,458 @@ +from typing import Any, Dict, List, Optional, Tuple, Union + +import numpy as np +from albumentations import BoxType, KeypointType +from albumentations.core.bbox_utils import ( + denormalize_bbox, + normalize_bbox, +) +from albumentations.core.transforms_interface import ( + BoxInternalType, + ImageColorType, + KeypointInternalType, +) + +from ..batch_transform import BatchBasedTransform +from ..utils import AUGMENTATIONS + + +@AUGMENTATIONS.register_module() +class Mosaic4(BatchBasedTransform): + def __init__( + self, + out_height: int, + out_width: int, + value: Optional[Union[int, float, List[int], List[float]]] = None, + out_batch_size: int = 1, + mask_value: Optional[Union[int, float, List[int], List[float]]] = None, + always_apply: bool = False, + p: float = 0.5, + ): + """Mosaic augmentation arranges selected four images into single image in a 2x2 + grid layout. This is done in deterministic way meaning first image in the batch + will always be in top left. The input images should have the same number of + channels but can have different widths and heights. The output is cropped around + the intersection point of the four images with the size (out_with x out_height). + If the mosaic image is smaller than with x height, the gap is filled by the + fill_value. + + @param out_height: Output image height. The mosaic image is cropped by this height around the mosaic center. + If the size of the mosaic image is smaller than this value the gap is filled by the `value`. + @type out_height: int + + @param out_width: Output image width. The mosaic image is cropped by this height around the mosaic center. + If the size of the mosaic image is smaller than this value the gap is filled by the `value`. + @type out_width: int + + @param value: Padding value. Defaults to None. + @type value: Optional[Union[int, float, List[int], List[float]]], optional + + @param out_batch_size: Number of output images in the batch. Defaults to 1. + @type out_batch_size: int, optional + + @param mask_value: Padding value for masks. Defaults to None. + @type mask_value: Optional[Union[int, float, List[int], List[float]]], optional + + @param always_apply: Whether to always apply the transform. Defaults to False. + @type always_apply: bool, optional + + @param p: Probability of applying the transform. Defaults to 0.5. + @type p: float, optional + """ + + super().__init__(batch_size=4, always_apply=always_apply, p=p) + + if out_height <= 0: + raise ValueError(f"out_height should be larger than 0, got {out_height}") + if out_width <= 0: + raise ValueError(f"out_width should be larger than 0, got {out_width}") + if out_batch_size <= 0: + raise ValueError( + f"out_batch_size should be larger than 0, got {out_batch_size}" + ) + + self.n_tiles = self.batch_size # 4: 2x2 + self.out_height = out_height + self.out_width = out_width + self.value = value + self.mask_value = mask_value + self.out_batch_size = out_batch_size + + def get_transform_init_args_names(self) -> Tuple[str, ...]: + """Gets the default arguments for the mixup augmentation. + + @return: The string keywords of the arguments. + @rtype: Tuple[str, ...] + """ + return ( + "out_height", + "out_width", + "replace", + "value", + "out_batch_size", + "mask_value", + ) + + @property + def targets_as_params(self): + """List of augmentation targets. + + @return: Output list of augmentation targets. + @rtype: List[str] + """ + return ["image_batch"] + + def apply_to_image_batch( + self, image_batch: List[np.ndarray], indices: List[int], **params + ) -> List[np.ndarray]: + """Applies the transformation to a batch of images. + + @param image_batch: Batch of input images to which the transformation is + applied. + @type image_batch: List[np.ndarray] + @param indices: Indices of images in the batch. + @type indices: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed images. + @rtype: List[np.ndarray] + """ + output_batch = [] + for i_batch in range(self.out_batch_size): + idx_chunk = indices[self.n_tiles * i_batch : self.n_tiles * (i_batch + 1)] + image_chunk = [image_batch[i] for i in idx_chunk] + mosaiced = mosaic4(image_chunk, self.out_height, self.out_width, self.value) + output_batch.append(mosaiced) + return output_batch + + def apply_to_mask_batch( + self, mask_batch: List[np.ndarray], indices: List[int], **params + ) -> List[np.ndarray]: + """Applies the transformation to a batch of masks. + + @param mask_batch: Batch of input masks to which the transformation is applied. + @type mask_batch: List[np.ndarray] + @param indices: Indices of images in the batch. + @type indices: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed masks. + @rtype: List[np.ndarray] + """ + output_batch = [] + for i_batch in range(self.out_batch_size): + idx_chunk = indices[self.n_tiles * i_batch : self.n_tiles * (i_batch + 1)] + mask_chunk = [mask_batch[i] for i in idx_chunk] + mosaiced = mosaic4( + mask_chunk, self.out_height, self.out_width, self.mask_value + ) + output_batch.append(mosaiced) + return output_batch + + def apply_to_bboxes_batch( + self, + bboxes_batch: List[BoxType], + indices: List[int], + image_shapes: List[Tuple[int, int]], + **params, + ) -> List[BoxType]: + """Applies the transformation to a batch of bboxes. + + @param bboxes_batch: Batch of input bboxes to which the transformation is + applied. + @type bboxes_batch: List[BboxType] + @param indices: Indices of images in the batch. + @type indices: List[Tuple[int, int]] + @param image_shapes: Shapes of the input images in the batch. + @type image_shapes: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed bboxes. + @rtype: List[BoxType] + """ + output_batch = [] + for i_batch in range(self.out_batch_size): + idx_chunk = indices[self.n_tiles * i_batch : self.n_tiles * (i_batch + 1)] + bboxes_chunk = [bboxes_batch[i] for i in idx_chunk] + shape_chunk = [image_shapes[i] for i in idx_chunk] + new_bboxes = [] + for i in range(self.n_tiles): + bboxes = bboxes_chunk[i] + rows, cols = shape_chunk[i] + for bbox in bboxes: + new_bbox = bbox_mosaic4( + bbox[:4], rows, cols, i, self.out_height, self.out_width + ) + new_bboxes.append(tuple(new_bbox) + tuple(bbox[4:])) + output_batch.append(new_bboxes) + return output_batch + + def apply_to_keypoints_batch( + self, + keyboints_batch: List[KeypointType], + indices: List[int], + image_shapes: List[Tuple[int, int]], + **params, + ) -> List[KeypointType]: + """Applies the transformation to a batch of keypoints. + + @param keypoints_batch: Batch of input keypoints to which the transformation is + applied. + @type keypoints_batch: List[KeypointType] + @param indices: Indices of images in the batch. + @type indices: List[Tuple[int, int]] + @param image_shapes: Shapes of the input images in the batch. + @type image_shapes: List[Tuple[int, int]] + @param params: Additional parameters for the transformation. + @type params: Any + @return: List of transformed keypoints. + @rtype: List[KeypointType] + """ + output_batch = [] + for i_batch in range(self.out_batch_size): + idx_chunk = indices[self.n_tiles * i_batch : self.n_tiles * (i_batch + 1)] + keypoints_chunk = [keyboints_batch[i] for i in idx_chunk] + shape_chunk = [image_shapes[i] for i in idx_chunk] + new_keypoints = [] + for i in range(self.n_tiles): + keypoints = keypoints_chunk[i] + rows, cols = shape_chunk[i] + for keypoint in keypoints: + new_keypoint = keypoint_mosaic4( + keypoint[:4], rows, cols, i, self.out_height, self.out_width + ) + new_keypoints.append(new_keypoint + tuple(keypoint[4:])) + output_batch.append(new_keypoints) + return output_batch + + def get_params_dependent_on_targets(self, params: Dict[str, Any]) -> Dict[str, Any]: + """Get parameters dependent on the targets. + + @param params: Dictionary containing parameters. + @type params: Dict[str, Any] + @return: Dictionary containing parameters dependent on the targets. + @rtype: Dict[str, Any] + """ + image_batch = params["image_batch"] + n = len(image_batch) + if self.n_tiles * self.out_batch_size > n: + raise ValueError( + f"The batch size (= {n}) should be larger than " + + f"{self.n_tiles} x out_batch_size (= {self.n_tiles * self.out_batch_size})" + ) + indices = np.random.choice( + range(n), size=self.n_tiles * self.out_batch_size, replace=False + ).tolist() + image_shapes = [tuple(image.shape[:2]) for image in image_batch] + return { + "indices": indices, + "image_shapes": image_shapes, + } + + +def mosaic4( + image_batch: List[np.ndarray], + height: int, + width: int, + value: Optional[ImageColorType] = None, +) -> np.ndarray: + """Arrange the images in a 2x2 grid layout. + The input images should have the same number of channels but can have different widths and heights. + The output is cropped around the intersection point of the four images with the size (with x height). + If the mosaic image is smaller than with x height, the gap is filled by the fill_value. + This implementation is based on YOLOv5 with some modification: + https://github.com/ultralytics/yolov5/blob/932dc78496ca532a41780335468589ad7f0147f7/utils/datasets.py#L648 + + @param image_batch: Image list. The length should be four. Each image can has different size. + @type image_batch: List[np.ndarray] + @param height: Height of output mosaic image + @type height: int + @param width: Width of output mosaic image + @type width: int + @param value: Padding value + @type value: Optional[ImageColorType] + @return: Final output image + @rtype: np.ndarray + """ + N_TILES = 4 + if len(image_batch) != N_TILES: + raise ValueError(f"Length of image_batch should be 4. Got {len(image_batch)}") + + for i in range(N_TILES - 1): + if image_batch[0].shape[2:] != image_batch[i + 1].shape[2:]: + raise ValueError( + "All images should have the same number of channels." + + f" Got the shapes {image_batch[0].shape} and {image_batch[i + 1].shape}" + ) + + if image_batch[0].dtype != image_batch[i + 1].dtype: + raise ValueError( + "All images should have the same dtype." + + f" Got the dtypes {image_batch[0].dtype} and {image_batch[i + 1].dtype}" + ) + + if len(image_batch[0].shape) == 2: + out_shape = [height, width] + else: + out_shape = [height, width, image_batch[0].shape[2]] + + dtype = image_batch[0].dtype + img4 = np.zeros(out_shape, dtype=dtype) # base image with 4 tiles + + value = 0 if value is None else value + if isinstance(value, (tuple, list, np.ndarray)): + if out_shape[2] != len(value): + ValueError( + "value parameter should has the same lengh as the output channel." + + f" value: ({value}), output shape: {out_shape}" + ) + for i in range(len(value)): + img4[:, :, i] = value[i] + else: + img4[:] = value + + center_x = width // 2 + center_y = height // 2 + for i, img in enumerate(image_batch): + (h, w) = img.shape[:2] + + # place img in img4 + # this based on the yolo5's implementation + # + if i == 0: # top left + x1a, y1a, x2a, y2a = ( + max(center_x - w, 0), + max(center_y - h, 0), + center_x, + center_y, + ) # xmin, ymin, xmax, ymax (large image) + x1b, y1b, x2b, y2b = ( + w - (x2a - x1a), + h - (y2a - y1a), + w, + h, + ) # xmin, ymin, xmax, ymax (small image) + elif i == 1: # top right + x1a, y1a, x2a, y2a = ( + center_x, + max(center_y - h, 0), + min(center_x + w, width), + center_y, + ) + x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h + elif i == 2: # bottom left + x1a, y1a, x2a, y2a = ( + max(center_x - w, 0), + center_y, + center_x, + min(height, center_y + h), + ) + x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, w, min(y2a - y1a, h) + elif i == 3: # bottom right + x1a, y1a, x2a, y2a = ( + center_x, + center_y, + min(center_x + w, width), + min(height, center_y + h), + ) + x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h) + + img4[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b] # img4[ymin:ymax, xmin:xmax] + + return img4 + + +def bbox_mosaic4( + bbox: BoxInternalType, + rows: int, + cols: int, + position_index: int, + height: int, + width: int, +) -> BoxInternalType: + """Put the given bbox in one of the cells of the 2x2 grid. + + @param bbox: A bounding box `(x_min, y_min, x_max, y_max)`. + @type bbox: BoxInternalType + @param rows: Height of input image that corresponds to one of the mosaic cells + @type rows: int + @param cols: Width of input image that corresponds to one of the mosaic cells + @type cols: int + @param position_index: Index of the mosaic cell. 0: top left, 1: top right, 2: + bottom left, 3: bottom right + @type position_index: int + @param height: Height of output mosaic image + @type height: int + @param width: Width of output mosaic image + @type width: int + @return: Transformed bbox + @rtype: BoxInternalType + """ + bbox = denormalize_bbox(bbox, rows, cols) + center_x = width // 2 + center_y = height // 2 + if position_index == 0: # top left + shift_x = center_x - cols + shift_y = center_y - rows + elif position_index == 1: # top right + shift_x = center_x + shift_y = center_y - rows + elif position_index == 2: # bottom left + shift_x = center_x - cols + shift_y = center_y + elif position_index == 3: # bottom right + shift_x = center_x + shift_y = center_y + bbox = ( + bbox[0] + shift_x, + bbox[1] + shift_y, + bbox[2] + shift_x, + bbox[3] + shift_y, + ) + bbox = normalize_bbox(bbox, height, width) + return bbox + + +def keypoint_mosaic4( + keypoint: KeypointInternalType, + rows: int, + cols: int, + position_index: int, + height: int, + width: int, +) -> KeypointInternalType: + """Put the given bbox in one of the cells of the 2x2 grid. + + @param keypoint: A keypoint `(x, y, angle, scale)`. + @type bbox: KeypointInternalType + @param rows: Height of input image that corresponds to one of the mosaic cells + @type rows: int + @param cols: Width of input image that corresponds to one of the mosaic cells + @type cols: int + @param position_index: Index of the mosaic cell. 0: top left, 1: top right, 2: + bottom left, 3: bottom right + @type position_index: int + @param height: Height of output mosaic image + @type height: int + @param width: Width of output mosaic image + @type width: int + @return: Transformed keypoint + @rtype: KeypointInternalType + """ + x, y, angle, scale = keypoint + + center_x = width // 2 + center_y = height // 2 + if position_index == 0: # top left + shift_x = center_x - cols + shift_y = center_y - rows + elif position_index == 1: # top right + shift_x = center_x + shift_y = center_y - rows + elif position_index == 2: # bottom left + shift_x = center_x - cols + shift_y = center_y + elif position_index == 3: # bottom right + shift_x = center_x + shift_y = center_y + return x + shift_x, y + shift_y, angle, scale diff --git a/luxonis_ml/data/augmentations.py b/luxonis_ml/data/augmentations/utils.py similarity index 50% rename from luxonis_ml/data/augmentations.py rename to luxonis_ml/data/augmentations/utils.py index 02832884..76480173 100644 --- a/luxonis_ml/data/augmentations.py +++ b/luxonis_ml/data/augmentations/utils.py @@ -1,22 +1,15 @@ -import random -import warnings -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Tuple import albumentations as A import cv2 import numpy as np -from albumentations.core.bbox_utils import denormalize_bbox, normalize_bbox -from albumentations.core.transforms_interface import ( - BoxInternalType, - BoxType, - DualTransform, - KeypointInternalType, - KeypointType, -) from luxonis_ml.enums import LabelType from luxonis_ml.utils.registry import Registry +from .batch_compose import BatchCompose, ForEach +from .batch_transform import BatchBasedTransform + AUGMENTATIONS = Registry(name="augmentations") @@ -38,7 +31,7 @@ def _parse_cfg( image_size: List[int], augmentations: List[Dict[str, Any]], keep_aspect_ratio: bool = True, - ) -> Tuple[A.BatchCompose, A.Compose]: + ) -> Tuple[BatchCompose, A.Compose]: """Parses provided config and returns Albumentations BatchedCompose object and Compose object for default transforms. @@ -49,7 +42,7 @@ def _parse_cfg( @type keep_aspect_ratio: bool @param keep_aspect_ratio: Whether should use resize that keeps aspect ratio of original image. - @rtype: Tuple[A.BatchCompose, A.Compose] + @rtype: Tuple[BatchCompose, A.Compose] @return: Objects for batched and spatial transforms """ @@ -57,7 +50,9 @@ def _parse_cfg( # NOTE: Always perform Resize if keep_aspect_ratio: - resize = LetterboxResize(height=image_size[0], width=image_size[1]) + resize = AUGMENTATIONS.get("LetterboxResize")( + height=image_size[0], width=image_size[1] + ) else: resize = A.Resize(image_size[0], image_size[1]) @@ -71,16 +66,16 @@ def _parse_cfg( pixel_augs.append(curr_aug) elif isinstance(curr_aug, A.DualTransform): spatial_augs.append(curr_aug) - elif isinstance(curr_aug, A.BatchBasedTransform): + elif isinstance(curr_aug, BatchBasedTransform): self.is_batched = True - self.aug_batch_size = max(self.aug_batch_size, curr_aug.n_tiles) + self.aug_batch_size = max(self.aug_batch_size, curr_aug.batch_size) batched_augs.append(curr_aug) # NOTE: always perform resize last spatial_augs.append(resize) - batch_transform = A.BatchCompose( + batch_transform = BatchCompose( [ - A.ForEach(pixel_augs), + ForEach(pixel_augs), *batched_augs, ], bbox_params=A.BboxParams( @@ -217,7 +212,15 @@ def __call__( def prepare_img_annotations( self, annotations: Dict[LabelType, np.ndarray], ih: int, iw: int - ) -> Tuple[np.ndarray]: + ) -> Tuple[ + np.ndarray, + np.ndarray, + np.ndarray, + np.ndarray, + np.ndarray, + np.ndarray, + np.ndarray, + ]: """Prepare annotations to be compatible with albumentations. @type annotations: Dict[LabelType, np.ndarray] @@ -226,7 +229,8 @@ def prepare_img_annotations( @param ih: Input image height @type iw: int @param iw: Input image width - @rtype: Tuple[np.ndarray] + @rtype: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, + np.ndarray, np.ndarray] @return: Annotations in albumentations format """ @@ -272,7 +276,7 @@ def post_transform_process( ns: int, nk: int, filter_kpts_by_bbox: bool, - ) -> Tuple[np.ndarray]: + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Postprocessing of albumentations output to LuxonisLoader format. @type transformed_data: Dict[str, np.ndarray] @@ -284,7 +288,7 @@ def post_transform_process( @type filter_kpts_by_bbox: bool @param filter_kpts_by_bbox: If True removes keypoint instances if its bounding box was removed. - @rtype: Tuple[np.ndarray] + @rtype: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] @return: Postprocessed annotations """ @@ -525,519 +529,3 @@ def __init__( AUGMENTATIONS.register_module(module=A.SmallestMaxSize) AUGMENTATIONS.register_module(module=A.Transpose) AUGMENTATIONS.register_module(module=A.VerticalFlip) - - -@AUGMENTATIONS.register_module() -class LetterboxResize(DualTransform): - def __init__( - self, - height: int, - width: int, - interpolation: int = cv2.INTER_LINEAR, - border_value: int = 0, - mask_value: int = 0, - always_apply: bool = False, - p: float = 1.0, - ): - """Augmentation to apply letterbox resizing to images. Also transforms masks, - bboxes and keypoints to correct shape. - - @param height: Desired height of the output. - @type height: int - @param width: Desired width of the output. - @type width: int - @param interpolation: Cv2 flag to specify interpolation used when resizing. - Defaults to cv2.INTER_LINEAR. - @type interpolation: int, optional - @param border_value: Padding value for images. Defaults to 0. - @type border_value: int, optional - @param mask_value: Padding value for masks. Defaults to 0. - @type mask_value: int, optional - @param always_apply: Whether to always apply the transform. Defaults to False. - @type always_apply: bool, optional - @param p: Probability of applying the transform. Defaults to 1.0. - @type p: float, optional - """ - - super().__init__(always_apply, p) - - if not (0 <= border_value <= 255): - raise ValueError("Border value must be in range [0,255].") - - if not (0 <= mask_value <= 255): - raise ValueError("Mask value must be in range [0,255].") - - self.height = height - self.width = width - self.interpolation = interpolation - self.border_value = border_value - self.mask_value = mask_value - - def update_params(self, params: Dict[str, Any], **kwargs) -> Dict[str, Any]: - """Updates augmentation parameters with the necessary metadata. - - @param params: The existing augmentation parameters dictionary. - @type params: Dict[str, Any] - @param kwargs: Additional keyword arguments to add the parameters. - @type kwargs: Any - @return: Updated dictionary containing the merged parameters. - @rtype: Dict[str, Any] - """ - - params = super().update_params(params, **kwargs) - - img_height = params["rows"] - img_width = params["cols"] - - ratio = min(self.height / img_height, self.width / img_width) - new_height = int(img_height * ratio) - new_width = int(img_width * ratio) - - # only supports center alignment - pad_top = (self.height - new_height) // 2 - pad_bottom = pad_top - - pad_left = (self.width - new_width) // 2 - pad_right = pad_left - - params.update( - { - "pad_top": pad_top, - "pad_bottom": pad_bottom, - "pad_left": pad_left, - "pad_right": pad_right, - } - ) - - return params - - def apply( - self, - img: np.ndarray, - pad_top: int, - pad_bottom: int, - pad_left: int, - pad_right: int, - **kwargs, - ) -> np.ndarray: - """Applies the letterbox augmentation to an image. - - @param img: Input image to which resize is applied. - @type img: np.ndarray - @param pad_top: Number of pixels to pad at the top. - @type pad_top: int - @param pad_bottom: Number of pixels to pad at the bottom. - @type pad_bottom: int - @param pad_left: Number of pixels to pad on the left. - @type pad_left: int - @param pad_right: Number of pixels to pad on the right. - @type pad_right: int - @param params: Additional parameters for the padding operation. - @type params: Any - @return: Image with applied letterbox resize. - @rtype: np.ndarray - """ - - resized_img = cv2.resize( - img, - (self.width - pad_left - pad_right, self.height - pad_top - pad_bottom), - interpolation=self.interpolation, - ) - img_out = cv2.copyMakeBorder( - resized_img, - pad_top, - pad_bottom, - pad_left, - pad_right, - cv2.BORDER_CONSTANT, - self.border_value, - ) - img_out = img_out.astype(img.dtype) - return img_out - - def apply_to_mask( - self, - img: np.ndarray, - pad_top: int, - pad_bottom: int, - pad_left: int, - pad_right: int, - **params, - ) -> np.ndarray: - """Applies letterbox augmentation to the input mask. - - @param img: Input mask to which resize is applied. - @type img: np.ndarray - @param pad_top: Number of pixels to pad at the top. - @type pad_top: int - @param pad_bottom: Number of pixels to pad at the bottom. - @type pad_bottom: int - @param pad_left: Number of pixels to pad on the left. - @type pad_left: int - @param pad_right: Number of pixels to pad on the right. - @type pad_right: int - @param params: Additional parameters for the padding operation. - @type params: Any - @return: Mask with applied letterbox resize. - @rtype: np.ndarray - """ - - resized_img = cv2.resize( - img, - (self.width - pad_left - pad_right, self.height - pad_top - pad_bottom), - interpolation=cv2.INTER_NEAREST, - ) - img_out = cv2.copyMakeBorder( - resized_img, - pad_top, - pad_bottom, - pad_left, - pad_right, - cv2.BORDER_CONSTANT, - self.mask_value, - ) - img_out = img_out.astype(img.dtype) - return img_out - - def apply_to_bbox( - self, - bbox: BoxInternalType, - pad_top: int, - pad_bottom: int, - pad_left: int, - pad_right: int, - **params, - ) -> BoxInternalType: - """Applies letterbox augmentation to the bounding box. - - @param img: Bounding box to which resize is applied. - @type img: BoxInternalType - @param pad_top: Number of pixels to pad at the top. - @type pad_top: int - @param pad_bottom: Number of pixels to pad at the bottom. - @type pad_bottom: int - @param pad_left: Number of pixels to pad on the left. - @type pad_left: int - @param pad_right: Number of pixels to pad on the right. - @type pad_right: int - @param params: Additional parameters for the padding operation. - @type params: Any - @return: Bounding box with applied letterbox resize. - @rtype: BoxInternalType - """ - - x_min, y_min, x_max, y_max = denormalize_bbox( - bbox, self.height - pad_top - pad_bottom, self.width - pad_left - pad_right - )[:4] - bbox = np.array( - [x_min + pad_left, y_min + pad_top, x_max + pad_left, y_max + pad_top] - ) - # clip bbox to image, ignoring padding - bbox = bbox.clip( - min=[pad_left, pad_top] * 2, - max=[params["cols"] + pad_left, params["rows"] + pad_top] * 2, - ).tolist() - return normalize_bbox(bbox, self.height, self.width) - - def apply_to_keypoint( - self, - keypoint: KeypointInternalType, - pad_top: int, - pad_bottom: int, - pad_left: int, - pad_right: int, - **params, - ) -> KeypointInternalType: - """Applies letterbox augmentation to the keypoint. - - @param img: Keypoint to which resize is applied. - @type img: KeypointInternalType - @param pad_top: Number of pixels to pad at the top. - @type pad_top: int - @param pad_bottom: Number of pixels to pad at the bottom. - @type pad_bottom: int - @param pad_left: Number of pixels to pad on the left. - @type pad_left: int - @param pad_right: Number of pixels to pad on the right. - @type pad_right: int - @param params: Additional parameters for the padding operation. - @type params: Any - @return: Keypoint with applied letterbox resize. - @rtype: KeypointInternalType - """ - - x, y, angle, scale = keypoint[:4] - scale_x = (self.width - pad_left - pad_right) / params["cols"] - scale_y = (self.height - pad_top - pad_bottom) / params["rows"] - new_x = (x * scale_x) + pad_left - new_y = (y * scale_y) + pad_top - # if keypoint is in the padding then set coordinates to -1 - out_keypoint = ( - new_x - if not self._out_of_bounds(new_x, pad_left, params["cols"] + pad_left) - else -1, - new_y - if not self._out_of_bounds(new_y, pad_top, params["rows"] + pad_top) - else -1, - angle, - scale * max(scale_x, scale_y), - ) - return out_keypoint - - def get_transform_init_args_names(self) -> Tuple[str, ...]: - """Gets the default arguments for the letterbox augmentation. - - @return: The string keywords of the arguments. - @rtype: Tuple[str, ...] - """ - - return ("height", "width", "interpolation", "border_value", "mask_value") - - def _out_of_bounds(self, value: float, min_limit: float, max_limit: float) -> bool: - """ "Check if the given value is outside the specified limits. - - @param value: The value to be checked. - @type value: float - @param min_limit: Minimum limit. - @type min_limit: float - @param max_limit: Maximum limit. - @type max_limit: float - @return: True if the value is outside the specified limits, False otherwise. - @rtype: bool - """ - return value < min_limit or value > max_limit - - -@AUGMENTATIONS.register_module(name="Mosaic4") -class DeterministicMosaic4(A.Mosaic4): - def __init__( - self, - out_height: int, - out_width: int, - value: Optional[Union[int, float, List[int], List[float]]] = None, - replace: bool = False, - out_batch_size: int = 1, - mask_value: Optional[Union[int, float, List[int], List[float]]] = None, - always_apply: bool = False, - p: float = 0.5, - ): - """Mosaic augmentation arranges selected four images into single image in a 2x2 - grid layout. This is done in deterministic way meaning first image in the batch - will always be in top left. The input images should have the same number of - channels but can have different widths and heights. The output is cropped around - the intersection point of the four images with the size (out_with x out_height). - If the mosaic image is smaller than with x height, the gap is filled by the - fill_value. - - @param out_height: Output image height. The mosaic image is cropped by this height around the mosaic center. - If the size of the mosaic image is smaller than this value the gap is filled by the `value`. - @type out_height: int - - @param out_width: Output image width. The mosaic image is cropped by this height around the mosaic center. - If the size of the mosaic image is smaller than this value the gap is filled by the `value`. - @type out_width: int - - @param value: Padding value. Defaults to None. - @type value: Optional[Union[int, float, List[int], List[float]]], optional - - @param replace: Whether to replace the original images in the mosaic. Current implementation - only supports this set to False. Defaults to False. - @type replace: bool, optional - - @param out_batch_size: Number of output images in the batch. Defaults to 1. - @type out_batch_size: int, optional - - @param mask_value: Padding value for masks. Defaults to None. - @type mask_value: Optional[Union[int, float, List[int], List[float]]], optional - - @param always_apply: Whether to always apply the transform. Defaults to False. - @type always_apply: bool, optional - - @param p: Probability of applying the transform. Defaults to 0.5. - @type p: float, optional - """ - - super().__init__( - out_height, - out_width, - value, - replace, - out_batch_size, - mask_value, - always_apply, - p, - ) - warnings.warn( - "Only deterministic version of Mosaic4 is available, setting replace=False." - ) - self.replace = False - - def get_params_dependent_on_targets(self, params: Dict[str, Any]) -> Dict[str, Any]: - """Get parameters dependent on the targets. - - @param params: Dictionary containing parameters. - @type params: Dict[str, Any] - @return: Dictionary containing parameters dependent on the targets. - @rtype: Dict[str, Any] - """ - target_params = super().get_params_dependent_on_targets(params) - target_params["indices"] = list(range(self.n_tiles)) - return target_params - - -@AUGMENTATIONS.register_module() -class MixUp(A.BatchBasedTransform): - def __init__( - self, - alpha: Union[float, Tuple[float, float]] = 0.5, - always_apply: bool = False, - p: float = 0.5, - ): - """MixUp augmentation that merges two images and their annotations into one. If - images are not of same size then second one is first resized to match the first - one. - - @param alpha: Mixing coefficient, either a single float or a tuple representing - the range. Defaults to 0.5. - @type alpha: Union[float, Tuple[float, float]], optional - @param always_apply: Whether to always apply the transform. Defaults to False. - @type always_apply: bool, optional - @param p: Probability of applying the transform. Defaults to 0.5. - @type p: float, optional - """ - super().__init__(always_apply=always_apply, p=p) - - self.alpha = alpha - self.n_tiles = 2 - self.out_batch_size = 1 - - def get_transform_init_args_names(self) -> Tuple[str, ...]: - """Gets the default arguments for the mixup augmentation. - - @return: The string keywords of the arguments. - @rtype: Tuple[str, ...] - """ - return ("alpha", "out_batch_size") - - @property - def targets_as_params(self) -> List[str]: - """List of augmentation targets. - - @return: Output list of augmentation targets. - @rtype: List[str] - """ - return ["image_batch"] - - def apply_to_image_batch( - self, - image_batch: List[np.ndarray], - image_shapes: List[Tuple[int, int]], - **params, - ) -> List[np.ndarray]: - """Applies the transformation to a batch of images. - - @param image_batch: Batch of input images to which the transformation is - applied. - @type image_batch: List[np.ndarray] - @param image_shapes: Shapes of the input images in the batch. - @type image_shapes: List[Tuple[int, int]] - @param params: Additional parameters for the transformation. - @type params: Any - @return: List of transformed images. - @rtype: List[np.ndarray] - """ - image1 = image_batch[0] - # resize second image to size of the first one - image2 = cv2.resize(image_batch[1], (image_shapes[0][1], image_shapes[0][0])) - - if isinstance(self.alpha, float): - curr_alpha = np.clip(self.alpha, 0, 1) - else: - curr_alpha = random.uniform(max(self.alpha[0], 0), min(self.alpha[1], 1)) - img_out = cv2.addWeighted(image1, curr_alpha, image2, 1 - curr_alpha, 0.0) - return [img_out] - - def apply_to_mask_batch( - self, - mask_batch: List[np.ndarray], - image_shapes: List[Tuple[int, int]], - **params, - ) -> List[np.ndarray]: - """Applies the transformation to a batch of masks. - - @param image_batch: Batch of input masks to which the transformation is applied. - @type image_batch: List[np.ndarray] - @param image_shapes: Shapes of the input images in the batch. - @type image_shapes: List[Tuple[int, int]] - @param params: Additional parameters for the transformation. - @type params: Any - @return: List of transformed masks. - @rtype: List[np.ndarray] - """ - mask1 = mask_batch[0] - mask2 = cv2.resize( - mask_batch[1], - (image_shapes[0][1], image_shapes[0][0]), - interpolation=cv2.INTER_NEAREST, - ) - out_mask = mask1 + mask2 - # if masks intersect keep one present in first image - mask_inter = mask1 > 0 - out_mask[mask_inter] = mask1[mask_inter] - return [out_mask] - - def apply_to_bboxes_batch( - self, bboxes_batch: List[BoxType], image_shapes: List[Tuple[int, int]], **params - ) -> List[BoxType]: - """Applies the transformation to a batch of bboxes. - - @param image_batch: Batch of input bboxes to which the transformation is - applied. - @type image_batch: List[BoxType] - @param image_shapes: Shapes of the input images in the batch. - @type image_shapes: List[Tuple[int, int]] - @param params: Additional parameters for the transformation. - @type params: Any - @return: List of transformed bboxes. - @rtype: List[BoxType] - """ - return [bboxes_batch[0] + bboxes_batch[1]] - - def apply_to_keypoints_batch( - self, - keypoints_batch: List[KeypointType], - image_shapes: List[Tuple[int, int]], - **params, - ) -> List[KeypointType]: - """Applies the transformation to a batch of keypoints. - - @param image_batch: Batch of input keypoints to which the transformation is - applied. - @type image_batch: List[BoxType] - @param image_shapes: Shapes of the input images in the batch. - @type image_shapes: List[Tuple[int, int]] - @param params: Additional parameters for the transformation. - @type params: Any - @return: List of transformed keypoints. - @rtype: List[BoxType] - """ - scaled_kpts2 = [] - scale_x = image_shapes[0][1] / image_shapes[1][1] - scale_y = image_shapes[0][0] / image_shapes[1][0] - for kpt in keypoints_batch[1]: - new_kpt = A.augmentations.geometric.functional.keypoint_scale( - keypoint=kpt, scale_x=scale_x, scale_y=scale_y - ) - scaled_kpts2.append(new_kpt + kpt[4:]) - return [keypoints_batch[0] + scaled_kpts2] - - def get_params_dependent_on_targets(self, params: Dict[str, Any]) -> Dict[str, Any]: - """Get parameters dependent on the targets. - - @param params: Dictionary containing parameters. - @type params: Dict[str, Any] - @return: Dictionary containing parameters dependent on the targets. - @rtype: Dict[str, Any] - """ - image_batch = params["image_batch"] - return {"image_shapes": [image.shape[:2] for image in image_batch]} diff --git a/luxonis_ml/data/requirements.txt b/luxonis_ml/data/requirements.txt index 2a6b94b4..cb535adb 100644 --- a/luxonis_ml/data/requirements.txt +++ b/luxonis_ml/data/requirements.txt @@ -6,7 +6,7 @@ PyYAML>=6.0 label-studio-sdk>=0.0.28 # boto3>=1.17.39 # google-cloud-storage>=2.10.0 -albumentations @ git+https://github.com/i-aki-y/albumentations.git@c137e2b # albumentations fork with batched transform support +albumentations>=1.3.1 pandas>=2.0.0 pyarrow>=13.0.0 pycocotools>=2.0.7 diff --git a/media/coverage_badge.svg b/media/coverage_badge.svg index 1d809e79..e123fe5b 100644 --- a/media/coverage_badge.svg +++ b/media/coverage_badge.svg @@ -15,7 +15,7 @@ coverage coverage - 10% - 10% + 9% + 9%