diff --git a/dali/pipeline/operator/builtin/copy.cc b/dali/pipeline/operator/builtin/copy.cc index 4bb34f9287..798cd2e5f3 100644 --- a/dali/pipeline/operator/builtin/copy.cc +++ b/dali/pipeline/operator/builtin/copy.cc @@ -38,6 +38,7 @@ void Copy::RunImpl(Workspace &ws) { } else { auto &input = ws.Input(0); auto &output = ws.Output(0); + DeviceGuard g(input.device_id()); output.Copy(input, ws.output_order()); } } diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index 529cbbfb96..c03d20bbb2 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -792,6 +792,9 @@ void ExposeTensor(py::module &m) { .def("layout", [](Tensor &t) { return t.GetLayout().str(); }) + .def("set_layout", [](Tensor &t, const std::optional &layout) { + SetLayout(t, layout); + }) .def("source_info", &Tensor::GetSourceInfo, R"(Gets a string descrbing the source of the data in the tensor, e.g. a name of the file from which the data was loaded.)") @@ -816,6 +819,15 @@ void ExposeTensor(py::module &m) { }, R"code(Passthrough, since the object is already an instance of `TensorCPU`.)code", py::return_value_policy::reference_internal) + .def("_make_copy", [](const Tensor &t) { + auto dst = std::make_unique>(); + dst->set_device_id(t.device_id()); + dst->set_order(t.order()); + dst->set_pinned(t.is_pinned()); + dst->Copy(t); + return dst; + }, + py::return_value_policy::take_ownership) .def("copy_to_external", [](Tensor &t, py::object p) { CopyToExternal(ctypes_void_ptr(p), t, AccessOrder::host(), false); @@ -961,16 +973,19 @@ void ExposeTensor(py::module &m) { .def("layout", [](Tensor &t) { return t.GetLayout().str(); }) + .def("set_layout", [](Tensor &t, const std::optional &layout) { + SetLayout(t, layout); + }) .def("source_info", &Tensor::GetSourceInfo, R"(Gets a string descrbing the source of the data in the tensor, e.g. a name of the file from which the data was loaded.)") .def("get_property", GetTensorProperty) .def("as_cpu", [](Tensor &t) -> Tensor* { + DeviceGuard g(t.device_id()); auto ret = std::make_unique>(); ret->set_pinned(false); UserStream * us = UserStream::Get(); cudaStream_t s = us->GetStream(t); - DeviceGuard g(t.device_id()); ret->Copy(t, s); us->Wait(t); return ret.release(); @@ -979,6 +994,15 @@ void ExposeTensor(py::module &m) { Returns a `TensorCPU` object being a copy of this `TensorGPU`. )code", py::return_value_policy::take_ownership) + .def("_make_copy", [](const Tensor &t) { + DeviceGuard dg(t.device_id()); + auto dst = std::make_unique>(); + dst->set_device_id(t.device_id()); + dst->set_order(t.order()); + dst->Copy(t); + return dst; + }, + py::return_value_policy::take_ownership) .def("squeeze", [](Tensor &t, py::object dim_arg) -> bool { if (!dim_arg.is_none()) { @@ -1301,6 +1325,9 @@ void ExposeTensorListCPU(py::module &m) { layout : str Layout of the data )code") + .def_static("broadcast", [](const Tensor &t, int num_samples) { + return std::make_shared>(t, num_samples); + }) .def("_as_gpu", [](TensorList &t) { auto ret = std::make_shared>(); int dev = -1; @@ -1320,9 +1347,20 @@ void ExposeTensorListCPU(py::module &m) { return t; }, R"code(Passthrough, as it is already an instance of `TensorListCPU`.)code", py::return_value_policy::reference_internal) + .def("_make_copy", [](const TensorList &t) { + auto dst = std::make_shared>(); + dst->set_device_id(t.device_id()); + dst->set_order(t.order()); + dst->set_pinned(t.is_pinned()); + dst->Copy(t); + return dst; + }) .def("layout", [](TensorList &t) { return t.GetLayout().str(); }) + .def("set_layout", [](TensorList &t, const std::optional &layout) { + SetLayout(t, layout); + }) .def("shape", &py_shape_list, R"code( Shape of the tensor list. @@ -1567,13 +1605,16 @@ void ExposeTesorListGPU(py::module &m) { R"code( List of tensors residing in the GPU memory. )code") + .def_static("broadcast", [](const Tensor &t, int num_samples) { + return std::make_shared>(t, num_samples); + }) .def("as_cpu", [](TensorList &t) { + DeviceGuard g(t.device_id()); auto ret = std::make_shared>(); ret->set_pinned(false); ret->SetContiguity(BatchContiguity::Contiguous); UserStream * us = UserStream::Get(); cudaStream_t s = us->GetStream(t); - DeviceGuard g(t.device_id()); ret->Copy(t, s); us->Wait(t); return ret; @@ -1582,6 +1623,15 @@ void ExposeTesorListGPU(py::module &m) { Returns a `TensorListCPU` object being a copy of this `TensorListGPU`. )code", py::return_value_policy::take_ownership) + .def("_make_copy", [](const TensorList &tl) { + DeviceGuard dg(tl.device_id()); + auto dst = std::make_shared>(); + dst->set_device_id(tl.device_id()); + dst->set_order(tl.order()); + dst->set_pinned(tl.is_pinned()); + dst->Copy(tl); + return dst; + }) .def( "device_id", &TensorList::device_id) .def("shape", &py_shape_list, @@ -1653,6 +1703,9 @@ void ExposeTesorListGPU(py::module &m) { .def("layout", [](TensorList &t) { return t.GetLayout().str(); }) + .def("set_layout", [](TensorList &t, const std::optional &layout) { + SetLayout(t, layout); + }) .def("as_reshaped_tensor", [](TensorList &tl, const vector &new_shape) { return tl.AsReshapedTensor(new_shape); diff --git a/dali/python/nvidia/dali/experimental/dali2/__init__.py b/dali/python/nvidia/dali/experimental/dali2/__init__.py index a0fd204698..334c9b35b1 100644 --- a/dali/python/nvidia/dali/experimental/dali2/__init__.py +++ b/dali/python/nvidia/dali/experimental/dali2/__init__.py @@ -20,3 +20,11 @@ from ._eval_context import * # noqa: F401, F403 from ._type import * # noqa: F401, F403 from ._device import * # noqa: F401, F403 +from ._tensor import Tensor, tensor, as_tensor # noqa: F401 +from ._batch import Batch, batch, as_batch # noqa: F401 + +from . import _fn +from . import ops + +ops._initialize() +_fn._initialize() diff --git a/dali/python/nvidia/dali/experimental/dali2/_batch.py b/dali/python/nvidia/dali/experimental/dali2/_batch.py new file mode 100644 index 0000000000..f96d573788 --- /dev/null +++ b/dali/python/nvidia/dali/experimental/dali2/_batch.py @@ -0,0 +1,730 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Optional, Union, Sequence +from ._type import DType, dtype as _dtype +from ._tensor import ( + Tensor, + _is_full_slice, + _try_convert_enums, + tensor as _tensor, + as_tensor as _as_tensor, +) +import nvidia.dali.backend as _backend +from ._eval_context import EvalContext as _EvalContext +from ._device import Device +from . import _eval_mode +from . import _invocation +import nvtx + + +def _backend_device(backend: Union[_backend.TensorListCPU, _backend.TensorListGPU]) -> Device: + if isinstance(backend, _backend.TensorListCPU): + return Device("cpu") + elif isinstance(backend, _backend.TensorListGPU): + return Device("gpu", backend.device_id()) + else: + raise ValueError(f"Unsupported backend type: {type(backend)}") + + +def _is_tensor_type(x): + from . import _batch + + if isinstance(x, _batch.Batch): + raise ValueError("A list of Batch objects is not a valid argument type") + if isinstance(x, Tensor): + return True + if hasattr(x, "__array__"): + return True + if hasattr(x, "__cuda_array_interface__"): + return True + if hasattr(x, "__dlpack__"): + return True + return False + + +def _get_batch_size(x): + if isinstance(x, Batch): + return x.batch_size + if isinstance(x, (_backend.TensorListCPU, _backend.TensorListGPU)): + return len(x) + return None + + +class BatchedSlice: + def __init__(self, batch: "Batch"): + self._batch = batch + + def __getitem__(self, ranges: Any) -> "Batch": + if not isinstance(ranges, tuple): + ranges = (ranges,) + if len(ranges) == 0: + return self._batch + + if all(_is_full_slice(r) for r in ranges): + return self._batch + + args = {} + d = 0 + for i, r in enumerate(ranges): + if r is Ellipsis: + d = self._batch.ndim - len(ranges) + i + 1 + elif isinstance(r, slice): + if r.start is not None: + args[f"lo_{d}"] = r.start + if r.stop is not None: + args[f"hi_{d}"] = r.stop + if r.step is not None: + args[f"step_{d}"] = r.step + d += 1 + else: + args[f"at_{d}"] = r + d += 1 + + from . import tensor_subscript + + return tensor_subscript(self._batch, **args) + + +def _arithm_op(name, *args, **kwargs): + from . import arithmetic_generic_op + + argsstr = " ".join(f"&{i}" for i in range(len(args))) + return arithmetic_generic_op(*args, expression_desc=f"{name}({argsstr})") + + +class _TensorList: + def __init__(self, batch: "Batch", indices: Optional[Union[list[int], range]] = None): + self._batch = batch + self._indices = indices or range(batch.batch_size) + + def __getitem__(self, selection: Union[int, slice, list[int]]): + return self.select(selection) + + def __len__(self): + return len(self._indices) + + def select(self, selection): + if selection == slice(None, None, None): + return self + if isinstance(selection, slice): + return _TensorList(self._batch, self._indices[selection]) + elif isinstance(selection, list): + return _TensorList(self._batch, [self._indices[i] for i in selection]) + else: + return self._batch.select(selection) + + def tolist(self): + return [self._batch._get_tensor(i) for i in self._indices] + + def as_batch(self): + return as_batch(self) + + +class Batch: + def __init__( + self, + tensors: Optional[Any] = None, + dtype: Optional[DType] = None, + device: Optional[Device] = None, + layout: Optional[str] = None, + invocation_result: Optional[_invocation.InvocationResult] = None, + copy: bool = False, + ): + """Constructs a Batch object. + Batch objects should not be constructed directly, use batch or as_batch instead. + + The batch object can be created either from an existing object, passed as `tensors` or + from an invocation result. + Unless explicitly requested with the `copy` parameter, this constructor will make best + effort to avoid the copy. + + Parameters + ---------- + tensors : TensorLike, default: None + The data to construct the batch from. It can be a list of tensors, a TensorList, + or other supported types. If None, the batch is constructed from an invocation result. + Supported types are: + - a list of tensor-like objects; the objects need to have matching number of dimensions, + data types and layouts, + - a tensor-like object; the outermost dimenion is interpreted as the batch dimension + - a dali.backend.TensorListCPU or dali.backend.TensorListGPU + dtype : DType, default: None + The desired data type of the batch. If not specified, the data type is inferred + from the input tensors. If specified, the input tensors are cast to the desired + data type. The `dtype` is required if `tensors` are an empty list. + device : Device or str, optional, default: None + The device on which the batch should reside (e.g., "cpu" or "gpu"). + If not specified, the device is inferred from the input tensors. + layout : str, optional, default: None + The layout string describing the dimensions of the batch (e.g., "HWC"). + If not specified, the layout is inferred from the input tensors. + invocation_result : _invocation.InvocationResult, default: None + The result of a DALI operator invocation, used for lazy evaluation + copy : bool, optional, default: False + If True, the input tensors are copied. If False, the constructor will avoid + copying data when possible. + """ + assert isinstance(layout, str) or layout is None + if device is not None and not isinstance(device, Device): + device = Device(device) + self._wraps_external_data = False + self._tensors = None + self._backend = None + self._dtype = None + self._device = None + self._invocation_result = None + copied = False + if tensors is not None: + if isinstance(tensors, (_backend.TensorListCPU, _backend.TensorListGPU)): + backend_dev = _backend_device(tensors) + if ( + (device is None or device == backend_dev) + and (dtype is None or dtype.type_id == tensors.dtype) + and (layout is None or layout == tensors.layout()) + ): + self._backend = tensors + self._device = backend_dev + self._layout = tensors.layout() + self._dtype = _dtype(tensors.dtype) + else: + tmp = Batch(tensors) + if device is not None and device != tmp.device: + tmp = tmp.to_device(device) + copied = True + if dtype is not None and dtype != tmp.dtype: + from . import cast + + tmp = cast(tmp, dtype=dtype, device=device) + copied = True + self.assign(tmp) + if self._backend and layout: + self._backend.set_layout(layout) + elif _is_tensor_type(tensors): + if copy: + t = _tensor(tensors, dtype=dtype, device=device, layout=layout) + else: + t = _as_tensor(tensors, dtype=dtype, device=device, layout=layout) + if t.ndim == 0: + raise ValueError("Cannot create a batch from a scalar") + if dtype is None: + dtype = t.dtype + if device is None: + device = t.device + if layout is None: + layout = t.layout + if t._backend is not None: + if isinstance(t._backend, _backend.TensorCPU): + self._backend = _backend.TensorListCPU(t._backend, layout=layout) + elif isinstance(t._backend, _backend.TensorGPU): + self._backend = _backend.TensorListGPU(t._backend, layout=layout) + else: + raise ValueError(f"Unsupported device type: {t.device.device_type}") + if t._wraps_external_data: + self._wraps_external_data = True + else: + sh = t.shape + tensors = [t[i] for i in range(sh[0])] + self._dtype = dtype + + else: + self._tensors = [] + for i, t in enumerate(tensors): + if t is None: + raise TypeError( + f"Tensors must be array-like types or numbers. Got `None` at index {i}" + ) + sample = Tensor(t, dtype=dtype, device=device, layout=layout) + if dtype is None: + dtype = sample.dtype + if device is None: + device = sample.device + if layout is None: + layout = sample.layout + self._tensors.append(sample) + if sample._wraps_external_data: + self._wraps_external_data = True + else: + if not isinstance(t, Tensor) or t._backend is not sample._backend: + copied = True + if dtype is None: + # We would have set dtype in the 1st iteration, so the only way it can + # be None is if the `_tensors` are empty. + assert len(self._tensors) == 0 + raise ValueError("Element type must be specified if the list is empty") + if device is None: + device = Device("cpu") + if layout is None: + layout = "" + self._device = device + self._layout = layout + self._dtype = dtype + if len(self._tensors) == 0: + with device: + t = Tensor([], dtype=dtype, device=device).evaluate() + if self._device.device_type == "cpu": + backend_type = _backend.TensorListCPU + elif self._device.device_type == "gpu": + backend_type = _backend.TensorListGPU + self._backend = backend_type(t._backend, layout=layout) + + if self._dtype is None: + if self._backend is not None: + self._dtype = DType.from_type_id(self._backend.dtype) + else: + self._dtype = dtype + if self._device is None: + if self._backend is not None: + self._device = _backend_device(self._backend) + else: + self._device = device + self._layout = layout + if self._invocation_result is None: + self._invocation_result = invocation_result + else: + assert invocation_result is None or invocation_result is self._invocation_result + self._ndim = None + if self._tensors and self._tensors[0]._shape: + self._ndim = len(self._tensors[0]._shape) + + if copy and not copied: + dev = self.to_device(self.device, force_copy=True) + if dtype is not None and dev.dtype != dtype: + from . import cast + + dev = cast(dev, dtype=dtype, device=device) + self.assign(dev.evaluate()) + copied = True + else: + if self._dtype is not None and dtype is not None and self._dtype != dtype: + from . import cast + + self.assign(cast(self, dtype=dtype, device=device)) + + if _eval_mode.EvalMode.current().value >= _eval_mode.EvalMode.eager.value: + self.evaluate() + + def _is_external(self) -> bool: + return self._wraps_external_data + + @staticmethod + def broadcast(sample, batch_size: int, device: Optional[Device] = None) -> "Batch": + if isinstance(sample, Batch): + raise ValueError("Cannot broadcast a Batch") + if _is_tensor_type(sample): + t = _as_tensor(sample, device=device).evaluate() + if t.device.device_type == "gpu": + tl_type = _backend.TensorListGPU + else: + tl_type = _backend.TensorListCPU + return Batch(tl_type.broadcast(t._backend, batch_size)) + import numpy as np + + with nvtx.annotate("to numpy and stack", domain="batch"): + arr = np.array(sample) + converted_dtype_id = None + if arr.dtype == np.float64: + arr = arr.astype(np.float32) + elif arr.dtype == np.int64: + arr = arr.astype(np.int32) + elif arr.dtype == np.uint64: + arr = arr.astype(np.uint32) + elif arr.dtype == object: + arr, converted_dtype_id = _try_convert_enums(arr) + arr = np.repeat(arr[np.newaxis], batch_size, axis=0) + + with nvtx.annotate("to backend", domain="batch"): + tl = _backend.TensorListCPU(arr) + if converted_dtype_id is not None: + tl.reinterpret(converted_dtype_id) + with nvtx.annotate("create batch", domain="batch"): + return Batch(tl, device=device) + + @property + def dtype(self) -> DType: + if self._dtype is None: + if self._backend is not None: + self._dtype = DType.from_type_id(self._backend.dtype) + elif self._invocation_result is not None: + self._dtype = _dtype(self._invocation_result.dtype) + elif self._tensors: + self._dtype = self._tensors[0].dtype + else: + raise ValueError("Cannot establish the number of dimensions of an empty Batch") + return self._dtype + + @property + def device(self) -> Device: + if self._device is None: + if self._invocation_result is not None: + self._device = self._invocation_result.device + elif self._tensors: + self._device = self._tensors[0].device + else: + raise ValueError("Cannot establish the number of dimensions of an empty Batch") + return self._device + + @property + def layout(self) -> str: + if self._layout is None: + if self._invocation_result is not None: + self._layout = self._invocation_result.layout + elif self._backend is not None: + self._layout = self._backend.layout() + if self._layout == "" and self._ndim != 0: + self._layout = None + elif self._tensors: + self._layout = self._tensors[0].layout + else: + raise ValueError("Cannot establish the number of dimensions of an empty Batch") + # Use "" to indicate that the layout has been checked and is empty, but still return None + # to avoid situations where we return a string with a length that doesn't match the number + # of dimensions. + return self._layout or None + + @property + def ndim(self) -> int: + if self._ndim is None: + if self._invocation_result is not None: + self._ndim = self._invocation_result.ndim + elif self._backend is not None: + self._ndim = self._backend.ndim() + elif self._tensors: + self._ndim = self._tensors[0].ndim + else: + raise ValueError("Cannot establish the number of dimensions of an empty Batch") + return self._ndim + + @property + def tensors(self): + return _TensorList(self) + + def to_device(self, device: Device, force_copy: bool = False) -> "Batch": + if device is not None and not isinstance(device, Device): + device = Device(device) + if self.device == device and not force_copy: + return self + else: + with device: + from . import copy + + ret = copy(self, device=device) + return ret + + def cpu(self) -> "Batch": + return self.to_device(Device("cpu")) + + def gpu(self, index: Optional[int] = None) -> "Batch": + return self.to_device(Device("gpu", index)) + + def assign(self, other: "Batch"): + if other is self: + return + self._device = other._device + self._dtype = other._dtype + self._layout = other._layout + self._backend = other._backend + if other._tensors is not None: + self._tensors = [t for t in other._tensors] # copy the list + else: + self._tensors = None + self._invocation_result = other._invocation_result + self._wraps_external_data = other._wraps_external_data + + @property + def slice(self): + """Interface for samplewise slicing. + + Regular slicing selects samples first and then slices each sample with common + slicing parameters. + + Samplewise slicing interface allows the slicing parmaters to be batches (with the same + number of samples) and the slicing parameters are applied to respective samples. + + ```Python + start = Batch([1, 2, 3]) + stop = Batch([4, 5, 6]) + step = Batch([1, 1, 2]) + sliced = input.slice[start, stop, step] + # the result is equivalent to + sliced = Batch([ + sample[start[i]:stop[i]:step[i]] + for i, sample in enumerate(input) + ]) + ``` + + If the slicing parameters are not batches, they are broadcast to all samples. + """ + return BatchedSlice(self) + + def __iter__(self): + return iter(self.tensors) + + def select(self, r): + if r is ...: + return self + if isinstance(r, slice): + return Batch(self.tensors[r]) + elif isinstance(r, list): + return Batch(self.tensors[r]) + else: + return self._get_tensor(r) + + def _get_tensor(self, i): + if self._tensors is None: + self._tensors = [None] * self.batch_size + + t = self._tensors[i] + if t is None: + t = self._tensors[i] = Tensor(batch=self, index_in_batch=i) + if self._backend: + t._backend = self._backend[i] + return t + + def _plain_slice(self, ranges): + def _is_batch(x): + return _get_batch_size(x) is not None + + for r in ranges: + is_batch_arg = _is_batch(r) + if isinstance(r, slice): + if _is_batch(r.start) or _is_batch(r.stop) or _is_batch(r.step): + is_batch_arg = True + if is_batch_arg: + raise ValueError( + "Cannot use a batch as an index or slice. in ``Batch.__getitem__``.\n" + "Use ``.slice`` property to perform samplewise slicing." + ) + return self.slice.__getitem__(ranges) + + @property + def batch_size(self) -> int: + if self._backend is not None: + return len(self._backend) + elif self._tensors is not None: + return len(self._tensors) + elif self._invocation_result is not None: + return self._invocation_result.batch_size + else: + raise ValueError("Neither tensors nor invocation result are set") + + @property + def shape(self): + if self._invocation_result is not None: + return self._invocation_result.shape + if self._backend is not None: + return self._backend.shape() + else: + assert self._tensors is not None + return [t.shape for t in self._tensors] + + def __str__(self) -> str: + return "Batch(\n" + str(self.evaluate()._backend) + ")" + + def evaluate(self): + with _EvalContext.get() as ctx: + if self._backend is None: + if self._invocation_result is not None: + self._backend = self._invocation_result.value(ctx) + else: + with self._device: + if self._device.device_type == "cpu": + backend_type = _backend.TensorListCPU + elif self._device.device_type == "gpu": + backend_type = _backend.TensorListGPU + else: + raise ValueError( + f"Internal error: " + f"Unsupported device type: {self._device.device_type}" + ) + self._backend = backend_type( + [t.evaluate()._backend for t in self._tensors], self.layout + ) + return self + + def __add__(self, other): + return _arithm_op("add", self, other) + + def __radd__(self, other): + return _arithm_op("add", other, self) + + def __sub__(self, other): + return _arithm_op("sub", self, other) + + def __rsub__(self, other): + return _arithm_op("sub", other, self) + + def __mul__(self, other): + return _arithm_op("mul", self, other) + + def __rmul__(self, other): + return _arithm_op("mul", other, self) + + def __pow__(self, other): + return _arithm_op("pow", self, other) + + def __rpow__(self, other): + return _arithm_op("pow", other, self) + + def __truediv__(self, other): + return _arithm_op("fdiv", self, other) + + def __rtruediv__(self, other): + return _arithm_op("fdiv", other, self) + + def __floordiv__(self, other): + return _arithm_op("div", self, other) + + def __rfloordiv__(self, other): + return _arithm_op("div", other, self) + + def __neg__(self): + return _arithm_op("minus", self) + + # Short-circuiting the execution, unary + is basically a no-op + def __pos__(self): + return self + + def __eq__(self, other): + return _arithm_op("eq", self, other) + + def __ne__(self, other): + return _arithm_op("neq", self, other) + + def __lt__(self, other): + return _arithm_op("lt", self, other) + + def __le__(self, other): + return _arithm_op("leq", self, other) + + def __gt__(self, other): + return _arithm_op("gt", self, other) + + def __ge__(self, other): + return _arithm_op("geq", self, other) + + def __and__(self, other): + return _arithm_op("bitand", self, other) + + def __rand__(self, other): + return _arithm_op("bitand", other, self) + + def __or__(self, other): + return _arithm_op("bitor", self, other) + + def __ror__(self, other): + return _arithm_op("bitor", other, self) + + def __xor__(self, other): + return _arithm_op("bitxor", self, other) + + def __rxor__(self, other): + return _arithm_op("bitxor", other, self) + + +def batch( + tensors: Union[Batch, Sequence[Any]], + dtype: Optional[DType] = None, + device: Optional[Device] = None, + layout: Optional[str] = None, +): + """Constructs a Batch object. + + Constructs a batch by copying the input tensors and optionally converting them to the desired + data type and storing on the specified device. + + Parameters + ---------- + tensors : TensorLike, default: None + The data to construct the batch from. Can be a list of tensors, a TensorList, + or other supported types. + Supported types are: + - a Batch object; the batch is copied and the data is converted and moved to the + specified device, if necessary + - a list of tensor-like objects; the objects need to have matching number of dimensions, + data types and layouts, + - a tensor-like object; the outermost dimenion is interpreted as the batch dimension + - a dali.backend.TensorListCPU or dali.backend.TensorListGPU + dtype : DType, default: None + The desired data type of the batch. If not specified, the data type is inferred + from the input tensors. If specified, the input tensors are cast to the desired data type. + The `dtype` is required if tensors are an empty list. + device : Device or str, optional, default: None + The device on which the batch should reside (e.g., "cpu" or "gpu"). + If not specified, the device is inferred from the input tensors. + layout : str, optional, default: None + The layout string describing the dimensions of the batch (e.g., "HWC"). + If not specified, the layout is inferred from the input tensors. + """ + if isinstance(tensors, Batch): + b = tensors.to_device(device or tensors.device, force_copy=True) + if dtype is not None and b.dtype != dtype: + from . import cast + + b = cast(b, dtype=dtype, device=device) + return b.evaluate() + else: + return Batch(tensors, dtype=dtype, device=device, layout=layout, copy=True) + + +def as_batch( + tensors: Union[Batch, Sequence[Any]], + dtype: Optional[DType] = None, + device: Optional[Device] = None, + layout: Optional[str] = None, +): + """Constructs a Batch object, avoiding the copy. + + Constructs a batch by viewing the input tensors as a batch. If the input tensors do not + reside on the specified device or do not match the desired type, the data will be converted + and/or copied, as necessary. + + Parameters + ---------- + tensors : TensorLike, default: None + The data to construct the batch from. It can be a list of tensors, a TensorList, + or other supported types. In general, the input tensors must be kept alive by the caller + until the batch is no longer needed. + Supported types are: + - a Batch object; the batch is copied and the data is converted and moved to the + specified device, if necessary + - a list of tensor-like objects; the objects need to have matching number of dimensions, + data types and layouts, + - a tensor-like object; the outermost dimenion is interpreted as the batch dimension + - a dali.backend.TensorListCPU or dali.backend.TensorListGPU + dtype : DType, default: None + The desired data type of the batch. If not specified, the data type is inferred + from the input tensors. If specified, the input tensors are cast to the desired data type. + The `dtype` is required if `tensors` are an empty list. + device : Device or str, optional, default: None + The device on which the batch should reside (e.g., "cpu" or "gpu"). + If not specified, the device is inferred from the input tensors. + layout : str, optional, default: None + The layout string describing the dimensions of the batch (e.g., "HWC"). + If not specified, the layout is inferred from the input tensors. + """ + if isinstance(tensors, Batch): + b = tensors + if device is not None: + b = tensors.to_device(device) + if dtype is not None and b.dtype != dtype: + from . import cast + + b = cast(b, dtype=dtype, device=device) + return b + else: + return Batch(tensors, dtype=dtype, device=device, layout=layout) + + +__all__ = ["Batch", "batch", "as_batch"] diff --git a/dali/python/nvidia/dali/experimental/dali2/_device.py b/dali/python/nvidia/dali/experimental/dali2/_device.py index dac48176ec..c739e3395f 100644 --- a/dali/python/nvidia/dali/experimental/dali2/_device.py +++ b/dali/python/nvidia/dali/experimental/dali2/_device.py @@ -14,35 +14,43 @@ import nvidia.dali.backend as _backend from threading import local - +from typing import Union, Optional class Device: _thread_local = local() def __init__(self, name: str, device_id: int = None): + device_type, name_device_id = Device.split_device_type_and_id(name) + if name_device_id is not None and device_id is not None: + raise ValueError(f"Invalid device name: {name}\n" + f"Ordinal ':{name_device_id}' should not appear " + "in device name when device_id is provided") if device_id is None: - type_and_id = name.split(":") - if len(type_and_id) < 1 or len(type_and_id) > 2: - raise ValueError(f"Invalid device name: {name}") - device_type = type_and_id[0] - if len(type_and_id) == 2: - device_id = int(type_and_id[1]) - else: - if ":" in name: - raise ValueError( - f"Invalid device name: {name}\n" - f"':' should not appear in device name when device_id is provided" - ) - device_type = name + device_id = name_device_id + + if device_type == "cuda": + device_type = "gpu" Device.validate_device_type(device_type) if device_id is not None: Device.validate_device_id(device_id, device_type) else: device_id = Device.default_device_id(device_type) + self.device_type = device_type self.device_id = device_id + @staticmethod + def split_device_type_and_id(name: str) -> tuple[str, int]: + type_and_id = name.split(":") + if len(type_and_id) < 1 or len(type_and_id) > 2: + raise ValueError(f"Invalid device name: {name}") + device_type = type_and_id[0] + device_id = None + if len(type_and_id) == 2: + device_id = int(type_and_id[1]) + return device_type, device_id + @staticmethod def default_device_id(device_type: str) -> int: if device_type == "cpu": @@ -135,3 +143,43 @@ def __exit__(self, exc_type, exc_value, traceback): Device._thread_local.devices = None Device._thread_local.previous_device_ids = None + + +def device(obj: Union[Device, str, "torch.device"], id: Optional[int] = None) -> Device: + """ + Returns a Device object from various input types. + + - If `obj` is already a `Device`, returns it. In this case, `id` must be `None`. + - If `obj` is a `str`, parses it as a device name (e.g., `"gpu"`, `"cpu:0"`, `"cuda:1"`). In this case, `id` can be specified. + Note: If the string already contains a device id and `id` is also provided, a `ValueError` is raised. + - If `obj` is a `torch.device`, converts it to a `Device`. In this case, `id` must be `None`. + - If `obj` is None, returns it. + - If `obj` is not a `Device`, `str`, or `torch.device` or None, raises a `TypeError`. + """ + + # None + if obj is None: + return obj + + # Device instance + if isinstance(obj, Device): + if id is not None: + raise ValueError("Cannot specify id when passing a Device instance") + return obj + + if isinstance(obj, str): + return Device(obj, id) + + # torch.device detected by duck-typing + is_torch_device = ( + obj.__class__.__module__ == "torch" and + obj.__class__.__name__ == "device" and + hasattr(obj, "type") and + hasattr(obj, "index")) + if is_torch_device: + dev_type = "gpu" if obj.type == "cuda" else obj.type + if id is not None: + raise ValueError("Cannot specify id when passing a torch.device") + return Device(dev_type, obj.index) + + raise TypeError(f"Cannot convert {type(obj)} to Device") diff --git a/dali/python/nvidia/dali/experimental/dali2/_eval_context.py b/dali/python/nvidia/dali/experimental/dali2/_eval_context.py index 17e9438fe7..bb38993ec5 100644 --- a/dali/python/nvidia/dali/experimental/dali2/_eval_context.py +++ b/dali/python/nvidia/dali/experimental/dali2/_eval_context.py @@ -43,7 +43,9 @@ def __init__(self, num_threads=None, device_id=None, cuda_stream=None): if self._cuda_stream is None and self._device.device_type == "gpu": self._cuda_stream = _b.Stream(self._device.device_id) - self._thread_pool = _b._ThreadPool(num_threads or default_num_threads) + self._thread_pool = _b._ThreadPool( + num_threads or default_num_threads, self._device.device_id + ) @staticmethod def current(): diff --git a/dali/python/nvidia/dali/experimental/dali2/_fn.py b/dali/python/nvidia/dali/experimental/dali2/_fn.py new file mode 100644 index 0000000000..57a9cb0906 --- /dev/null +++ b/dali/python/nvidia/dali/experimental/dali2/_fn.py @@ -0,0 +1,26 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import ops +from . import _op_builder + + +def _initialize(): + for op in ops._all_ops: + if op.op_name.startswith("_"): + continue + if op.schema.IsStateful(): + continue + + _op_builder.build_fn_wrapper(op) diff --git a/dali/python/nvidia/dali/experimental/dali2/_op_builder.py b/dali/python/nvidia/dali/experimental/dali2/_op_builder.py new file mode 100644 index 0000000000..dc5c49a19f --- /dev/null +++ b/dali/python/nvidia/dali/experimental/dali2/_op_builder.py @@ -0,0 +1,508 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import nvidia.dali.backend as _b +from nvidia.dali.fn import _to_snake_case +import makefun +from ._batch import Batch, _get_batch_size +from ._tensor import Tensor +from . import ops +from . import _type +import types +import copy +from . import _invocation, _device, _eval_mode, _eval_context +import nvidia.dali.ops as _ops +import nvidia.dali.types +import nvtx + + +def is_external(x): + if isinstance(x, Tensor): + return x._is_external() + if isinstance(x, Batch): + return x._is_external() + return False + + +def _scalar_decay(x): + if isinstance(x, _device.Device): + return x.device_type + if isinstance(x, _type.DType): + return x.type_id + if x is str: + return nvidia.dali.types.STRING + if x is bool: + return nvidia.dali.types.BOOL + if x is int or x is float: + raise ValueError( + f"Do not use Python built-in type {x} as an argument. " + f"Use one of the DALI types instead." + ) + return x + + +def _get_input_device(x): + with nvtx.annotate("get_input_device", domain="op_builder"): + if x is None: + return None + if isinstance(x, Batch): + return x.device + if isinstance(x, Tensor): + return x.device + if isinstance(x, _b.TensorListCPU): + return _device.Device("cpu") + if isinstance(x, _b.TensorListGPU): + return _device.Device("gpu") + if hasattr(x, "__cuda_array_interface__"): + return _device.Device("gpu") + if hasattr(x, "__dlpack_device__"): + dev = x.__dlpack_device__() + if int(dev[0]) == 1 or int(dev[0]) == 3: # CPU or CPU_PINNED + return _device.Device("cpu") + elif int(dev[0]) == 2: + return _device.Device("gpu", dev[1]) + else: + raise ValueError(f"Unknown DLPack device type: {dev.type}") + if hasattr(x, "__dlpack__"): + return _device.Device("cpu") + if isinstance(x, list) and x: + return _get_input_device(x[0]) + return None + + +def _get_input_device_type(x): + dev = _get_input_device(x) + return dev.device_type if dev is not None else None + + +def _to_tensor(x, device=None): + with nvtx.annotate("to_tensor", domain="op_builder"): + if x is None: + return None + if isinstance(x, Tensor): + if device is not None: + return x.to_device(device) + return x + if isinstance(x, _invocation.InvocationResult): + if x.is_batch: + raise ValueError("Batch invocation result cannot be used as a single tensor") + return Tensor(invocation_result=x, device=device) + return Tensor(x, device=device) + + +def _to_batch(x, batch_size, device=None): + with nvtx.annotate("to_batch", domain="op_builder"): + if x is None: + return None + if isinstance(x, Batch): + if device is not None: + return x.to_device(device) + return x + if isinstance(x, _invocation.InvocationResult): + if x.is_batch: + return Batch(invocation_result=x, device=device) + else: + x = _to_tensor(x) # fall back to regular replication + actual_batch_size = _get_batch_size(x) + if actual_batch_size is not None: + if batch_size is not None and actual_batch_size != batch_size: + raise ValueError(f"Unexpected batch size: {actual_batch_size} != {batch_size}") + return Batch(x, device=device) + + return Batch.broadcast(x, batch_size, device=device) + + +_unsupported_args = {"bytes_per_sample_hint", "preserve"} + + +def _find_or_create_module(root_module, module_path): + module = root_module + for path_part in module_path: + submodule = getattr(module, path_part, None) + if submodule is None: + submodule = types.ModuleType(path_part) + setattr(module, path_part, submodule) + module = submodule + return module + + +def build_operator_class(schema): + class_name = schema.OperatorName() + module_path = schema.ModulePath() + is_reader = "readers" in module_path + if is_reader: + from .. import dali2 as parent + + module = parent + else: + module = ops + legacy_op_class = None + import nvidia.dali.ops + + legacy_op_module = nvidia.dali.ops + for path_part in module_path: + legacy_op_module = getattr(legacy_op_module, path_part) + module = _find_or_create_module(module, module_path) + + legacy_op_class = getattr(legacy_op_module, class_name) + base = ops.Operator + if "readers" in module.__name__: + base = ops.Reader + op_class = type(class_name, (base,), {}) + op_class.schema = schema + op_class.op_name = class_name + op_class.fn_name = _to_snake_case(class_name) + op_class.legacy_op = legacy_op_class + op_class.is_stateful = schema.IsStateful() + op_class._instance_cache = {} + op_class.__init__ = build_constructor(schema, op_class) + op_class.__call__ = build_call_function(schema, op_class) + op_class.__module__ = module.__name__ + op_class.__qualname__ = class_name + setattr(module, class_name, op_class) + return op_class + + +def build_constructor(schema, op_class): + stateful = op_class.is_stateful + function_name = "__init__" + + call_args = [] + for arg in schema.GetArgumentNames(): + if arg in _unsupported_args: + continue + if schema.IsTensorArgument(arg): + continue + if schema.IsArgumentOptional(arg): + call_args.append(f"{arg}=None") + else: + call_args.append(arg) + + if call_args: + call_args = ["*"] + call_args + header_args = [ + "self", + "max_batch_size=None", + "name=None", + 'device="cpu"', + "num_inputs=None", + "call_arg_names=None", + ] + call_args + header = f"__init__({', '.join(header_args)})" + + def init(self, max_batch_size, name, **kwargs): + kwargs = {k: _scalar_decay(v) for k, v in kwargs.items()} + op_class.__base__.__init__(self, max_batch_size, name, **kwargs) + if stateful: + self._call_id = 0 + + function = makefun.create_function(header, init) + function.__qualname__ = f"{op_class.__name__}.{function_name}" + + return function + + +def build_call_function(schema, op_class): + stateful = op_class.is_stateful + call_args = [] + for arg in schema.GetArgumentNames(): + if arg in _unsupported_args: + continue + if not schema.IsTensorArgument(arg): + continue + if schema.IsArgumentOptional(arg): + call_args.append(f"{arg}=None") + else: + call_args.append(arg) + + inputs = [] + min_inputs = schema.MinNumInput() + max_inputs = schema.MaxNumInput() + input_indices = {} + arguments = schema.GetArgumentNames() + for i in range(max_inputs): + if schema.HasInputDox(): + input_name = schema.GetInputName(i) + if input_name in arguments: + input_name += "_input" + else: + input_name = f"input_{i}" + input_indices[input_name] = i + if i < min_inputs: + inputs.append(f"{input_name}") + else: + inputs.append(f"{input_name}=None") + + call_args = ["*", "batch_size=None"] + call_args + if inputs: + inputs = inputs + ["/"] + header = f"__call__({', '.join(['self'] + inputs + call_args)})" + + def call(self, *raw_args, batch_size=None, **raw_kwargs): + with nvtx.annotate(f"__call__: {self.op_name}", domain="op_builder"): + self._pre_call(*raw_args, **raw_kwargs) + with nvtx.annotate("__call__: get batch size", domain="op_builder"): + is_batch = batch_size is not None + if batch_size is None: + for i, x in enumerate(list(raw_args) + list(raw_kwargs.values())): + x_batch_size = _get_batch_size(x) + if x_batch_size is not None: + is_batch = True + if batch_size is not None: + if x_batch_size != batch_size: + raise ValueError( + f"Inconsistent batch size: {x_batch_size} != {batch_size}" + ) + else: + batch_size = x_batch_size + if not is_batch: + batch_size = self._max_batch_size or 1 + + inputs = [] + kwargs = {} + + if is_batch: + with nvtx.annotate("__call__: convert to batches", domain="op_builder"): + for i, inp in enumerate(raw_args): + if inp is None: + continue + input_device = self.input_device(i, _get_input_device_type(inp)) + inp = _to_batch(inp, batch_size, device=input_device) + inputs.append(inp) + for k, v in raw_kwargs.items(): + if v is None: + continue + kwargs[k] = _to_batch(v, batch_size, device=_device.Device("cpu")) + else: + with nvtx.annotate("__call__: convert to tensors", domain="op_builder"): + for inp in raw_args: + if inp is None: + continue + inputs.append(_to_tensor(inp)) + for k, v in raw_kwargs.items(): + if v is None: + continue + kwargs[k] = _to_tensor(v) + + with nvtx.annotate("__call__: shallowcopy", domain="op_builder"): + inputs = [copy.copy(x) for x in inputs] + kwargs = {k: copy.copy(v) for k, v in kwargs.items()} + + if stateful: + call_id = self._call_id + self._call_id += 1 + else: + call_id = None + with nvtx.annotate("__call__: construct Invocation", domain="op_builder"): + invocation = _invocation.Invocation( + self, + call_id, + inputs, + kwargs, + is_batch=is_batch, + batch_size=batch_size, + previous_invocation=self._last_invocation, + ) + + if stateful: + self._last_invocation = invocation + + if ( + _eval_mode.EvalMode.current() == _eval_mode.EvalMode.eager + or _eval_mode.EvalMode.current() == _eval_mode.EvalMode.sync_cpu + or _eval_mode.EvalMode.current() == _eval_mode.EvalMode.sync_full + or ( + _eval_mode.EvalMode.current() == _eval_mode.EvalMode.default + and ( + any(is_external(x) for x in inputs) + or any(is_external(x) for x in kwargs.values()) + ) + ) + ): + # Evaluate immediately + invocation.run(_eval_context.EvalContext.get()) + else: + # Lazy evaluation + # If there's an active evaluation context, add this invocation to it. + # When leaving the context, the invocation will be evaluated if it's still alive. + ctx = _eval_context.EvalContext.current() + if ctx is not None: + ctx._add_invocation(invocation, weak=not self.is_stateful) + + if is_batch: + if len(invocation) == 1: + return Batch(invocation_result=invocation[0]) + else: + return tuple( + Batch(invocation_result=invocation[i]) for i in range(len(invocation)) + ) + else: + if len(invocation) == 1: + return Tensor(invocation_result=invocation[0]) + else: + return tuple( + Tensor(invocation_result=invocation[i]) for i in range(len(invocation)) + ) + + function = makefun.create_function(header, call) + + return function + + +def _next_pow2(x): + return 1 << (x - 1).bit_length() + + +def build_fn_wrapper(op): + schema = op.schema + module_path = schema.ModulePath() + from .. import dali2 as parent + + module = parent + for path_part in module_path: + new_module = getattr(module, path_part, None) + if new_module is None: + new_module = types.ModuleType(path_part) + setattr(module, path_part, new_module) + module = new_module + + fn_name = _to_snake_case(op.schema.OperatorName()) + inputs = [] + min_inputs = schema.MinNumInput() + max_inputs = schema.MaxNumInput() + input_indices = {} + arguments = schema.GetArgumentNames() + for i in range(max_inputs): + if schema.HasInputDox(): + input_name = schema.GetInputName(i) + if input_name in arguments: + input_name += "_input" + else: + input_name = f"input_{i}" + input_indices[input_name] = i + if i < min_inputs: + inputs.append(f"{input_name}") + else: + inputs.append(f"{input_name}=None") + + fixed_args = [] + tensor_args = [] + signature_args = ["batch_size=None, device=None"] + for arg in op.schema.GetArgumentNames(): + if arg in _unsupported_args: + continue + if op.schema.IsTensorArgument(arg): + tensor_args.append(arg) + else: + fixed_args.append(arg) + if op.schema.IsArgumentOptional(arg): + signature_args.append(f"{arg}=None") + else: + signature_args.append(arg) + + if signature_args: + signature_args = ["*"] + signature_args + if inputs: + inputs = inputs + ["/"] + header = f"{fn_name}({', '.join(inputs + signature_args)})" + + def fn_call(*inputs, batch_size=None, device=None, **raw_kwargs): + if batch_size is None: + for x in inputs: + x_batch_size = _get_batch_size(x) + if x_batch_size is not None: + batch_size = x_batch_size + break + if batch_size is None: + for arg in raw_kwargs.values(): + x_batch_size = _get_batch_size(arg) + if x_batch_size is not None: + batch_size = x_batch_size + break + max_batch_size = _next_pow2(batch_size or 1) + init_args = { + arg: _scalar_decay(raw_kwargs[arg]) + for arg in fixed_args + if arg != "max_batch_size" and arg in raw_kwargs and raw_kwargs[arg] is not None + } + call_args = { + arg: _scalar_decay(raw_kwargs[arg]) + for arg in tensor_args + if arg in raw_kwargs and raw_kwargs[arg] is not None + } + # If device is not specified, infer it from the inputs and call_args + if device is None: + + def _infer_device(): + for inp in inputs: + if inp is None: + continue + dev = _get_input_device(inp) + if dev is not None and dev.device_type == "gpu": + return dev + for arg in raw_kwargs.values(): + if arg is None: + continue + dev = _get_input_device(arg) + if dev is not None and dev.device_type == "gpu": + return dev + return _device.Device("cpu") + + device = _infer_device() + elif not isinstance(device, _device.Device): + device = _device.Device(device) + + # Get or create the operator instance that matches the arguments + with nvtx.annotate(f"get instance {op.op_name}", domain="op_builder"): + op_inst = op.get( + max_batch_size=max_batch_size, + name=None, + device=device, + num_inputs=len(inputs), + call_arg_names=tuple(call_args.keys()), + **init_args, + ) + + # Call the operator (the result is an Invocation object) + return op_inst(*inputs, batch_size=batch_size, **call_args) + + function = makefun.create_function(header, fn_call) + function.op_class = op + function.schema = schema + setattr(module, fn_name, function) + return function + + +def build_operators(): + _all_ops = _ops._registry._all_registered_ops() + all_op_classes = [] + deprecated = {} + op_map = {} + for op_name in _all_ops: + if op_name.endswith("ExternalSource") or op_name.endswith("PythonFunction"): + continue + + schema = _b.GetSchema(op_name) + deprecated_in_favor = schema.DeprecatedInFavorOf() + if deprecated_in_favor: + deprecated[op_name] = deprecated_in_favor + cls = build_operator_class(schema) + all_op_classes.append(cls) + op_map[op_name] = cls + for deprecated, in_favor in deprecated.items(): + schema = _b.GetSchema(deprecated) + module = _find_or_create_module(ops, schema.ModulePath()) + setattr(module, deprecated, op_map[in_favor]) + + return all_op_classes diff --git a/dali/python/nvidia/dali/experimental/dali2/_tensor.py b/dali/python/nvidia/dali/experimental/dali2/_tensor.py new file mode 100644 index 0000000000..9727f7f943 --- /dev/null +++ b/dali/python/nvidia/dali/experimental/dali2/_tensor.py @@ -0,0 +1,815 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Optional, Tuple, Union +from ._type import DType, dtype as _dtype, type_id as _type_id +from ._device import Device +import nvidia.dali.backend as _backend +from ._eval_context import EvalContext as _EvalContext +from . import _eval_mode +from . import _invocation +import copy +import nvidia.dali.types + + +def _volume(shape: Tuple[int, ...]) -> int: + ret = 1 + for s in shape: + ret *= s + return ret + + +def _backend_device(backend: Union[_backend.TensorCPU, _backend.TensorGPU]) -> Device: + if isinstance(backend, _backend.TensorCPU): + return Device("cpu") + elif isinstance(backend, _backend.TensorGPU): + return Device("gpu", backend.device_id()) + else: + raise ValueError(f"Unsupported backend type: {type(backend)}") + + +def _get_array_interface(data): + if a_func := getattr(data, "__array__", None): + try: + return a_func() + except TypeError: # CUDA torch tensor, CuPy array, etc. + return None + else: + return None + + +def _try_convert_enums(arr): + assert arr.dtype == object + if arr.size == 0: + raise ValueError("Cannot convert an empty array of `object` type.") + item = arr.flat[0] + import numpy as np + + if isinstance(item, nvidia.dali.types.DALIInterpType): + return arr.astype(np.int32), nvidia.dali.types.INTERP_TYPE + elif isinstance(item, nvidia.dali.types.DALIDataType): + return arr.astype(np.int32), nvidia.dali.types.DATA_TYPE + elif isinstance(item, nvidia.dali.types.DALIImageType): + return arr.astype(np.int32), nvidia.dali.types.IMAGE_TYPE + else: + raise TypeError("Unexpected element type f{type(item)}") + + +class Tensor: + def __init__( + self, + data: Optional[Any] = None, + dtype: Optional[Any] = None, + device: Optional[Device] = None, + layout: Optional[str] = None, + batch: Optional[Any] = None, + index_in_batch: Optional[int] = None, + invocation_result: Optional[_invocation.InvocationResult] = None, + copy: bool = False, + ): + """Constructs a Tensor object. + Tensor objects should not be constructed directly, use tensor or as_tensor instead. + + The Tensor object can be created either from an existing object, passed as `data` or + from an invocation result. + Unless explicitly requested with the `copy` parameter, this constructor will make best + effort to avoid the copy. + + Parameters + ---------- + data : TensorLike, default: None + The data to construct the tensor from. It can be a tensor-like object, a (nested) list, + TensorCPU/TensorGPU or other supported type. + dtype : DType, default: None + The desired data type of the tensor. If not specified, the data type is inferred + from the input data. If specified, the input data is cast to the desired data type. + device : Device or str, optional, default: None + The device on which the tensor should reside (e.g., "cpu" or "gpu"). + If not specified, the device is inferred from the input data. + layout : str, optional, default: None + The layout string describing the dimensions of the tensor (e.g., "HWC"). + If not specified, the layout is inferred from the input data, if possible. + batch : Batch, optional, default: None + Use if the tensor is a view of a sample in a batch. Used together with `index_in_batch`. + index_in_batch : int, optional, default: None + The index of the tensor in the batch. Used together with `batch`. + invocation_result : _invocation.InvocationResult, default: None + The result of a DALI operator invocation, used for lazy evaluation + copy : bool, optional, default: False + If True, the input data is copied. If False, the constructor will avoid + copying data when possible. + """ + if layout is None: + layout = "" + elif not isinstance(layout, str): + raise ValueError(f"Layout must be a string, got {type(layout)}") + + self._slice = None + self._backend = None + self._batch = batch + self._index_in_batch = index_in_batch + self._invocation_result = None + self._device = None + self._shape = None + self._dtype = None + self._layout = None + self._wraps_external_data = False + + if device is not None and not isinstance(device, Device): + device = Device(device) # TODO(michalz): Use the `device` function, when merged + + copied = False + + if dtype is not None: + if not isinstance(dtype, DType): + dtype = _dtype(dtype) + + if batch is not None: + from . import _batch + + if not isinstance(batch, _batch.Batch): + raise ValueError("The `batch` argument must be a `Batch`") + self._batch = batch + self._index_in_batch = index_in_batch + self._dtype = batch.dtype + self._device = batch.device + self._layout = batch.layout + elif data is not None: + if isinstance(data, (_backend.TensorCPU, _backend.TensorGPU)): + self._backend = data + self._wraps_external_data = True + self._device = _backend_device(data) + elif isinstance(data, Tensor): + if dtype is None or _type_id(dtype) == data.dtype.type_id: + if device is None or device == data.device: + self.assign(data) + self._wraps_external_data = data._wraps_external_data + else: + dev = data.to_device(device).evaluate() + if dev is not self: + copied = True + self.assign(dev) + self._wraps_external_data = not copied + else: + from . import cast + + converted = cast(data.to_device(device), dtype=dtype, device=self.device) + self.assign(converted.evaluate()) + copied = True + elif isinstance(data, TensorSlice): + self._slice = data + elif hasattr(data, "__dlpack_device__"): + dl_device_type, device_id = data.__dlpack_device__() + if int(dl_device_type) == 1: # CPU + self._backend = _backend.TensorCPU(data.__dlpack__(), layout) + elif int(dl_device_type) == 2: # GPU + # If the current context is on the same device, use the same stream. + ctx = _EvalContext.get() + if ctx.device_id == device_id: + stream = ctx.cuda_stream + else: + stream = _backend.Stream(device_id) + args = {"stream": stream.handle} + self._backend = _backend.TensorGPU( + data.__dlpack__(**args), + layout=layout, + stream=stream, + ) + else: + raise ValueError(f"Unsupported device type: {dl_device_type}") + self._wraps_external_data = True + elif a := _get_array_interface(data): + self._backend = _backend.TensorCPU(a, layout) + self._wraps_external_data = True + else: + import numpy as np + + if dtype is not None: + # TODO(michalz): Built-in enum handling + self._backend = _backend.TensorCPU( + np.array(data, dtype=nvidia.dali.types.to_numpy_type(dtype.type_id)), + layout, + False, + ) + copied = True + self._wraps_external_data = False + self._dtype = dtype + else: + arr = np.array(data) + # DALI doesn't support int64 and float64, so we need to convert them to int32 + # and float32, respectively. + converted_dtype_id = None + if arr.dtype == np.int64: + arr = arr.astype(np.int32) + elif arr.dtype == np.uint64: + arr = arr.astype(np.uint32) + elif arr.dtype == np.float64: + arr = arr.astype(np.float32) + elif arr.dtype == object: + (arr, converted_dtype_id) = _try_convert_enums(arr) + self._backend = _backend.TensorCPU(arr, layout, False) + if converted_dtype_id is not None: + self._backend.reinterpret(converted_dtype_id) + copied = True + self._wraps_external_data = False + + if self._backend is not None: + self._device = _backend_device(self._backend) + if device is None: + device = self._device + else: + if device is None: + device = Device("cpu") + self._device = device + + if self._backend is not None: + self._shape = tuple(self._backend.shape()) + self._dtype = DType.from_type_id(self._backend.dtype) + self._layout = self._backend.layout() + + if isinstance(self._backend, _backend.TensorCPU) and device != _backend_device( + self._backend + ): + self.assign(self.to_device(device).evaluate()) + elif invocation_result is not None: + self._invocation_result = invocation_result + self._device = invocation_result.device + else: + raise ValueError("Either data, expression or batch and index must be provided") + + if dtype is not None and self._dtype != dtype: + from . import cast + + self.assign(cast(self, dtype=dtype, device=self.device).evaluate()) + copied = True + + if _eval_mode.EvalMode.current().value >= _eval_mode.EvalMode.eager.value: + self.evaluate() + + if copy and self._backend is not None and not copied: + self.assign(self.to_device(device, True).evaluate()) + + def _is_external(self) -> bool: + return self._wraps_external_data + + def cpu(self) -> "Tensor": + return self.to_device(Device("cpu")) + + def gpu(self, index: Optional[int] = None) -> "Tensor": + return self.to_device(Device("gpu", index)) + + @property + def device(self) -> Device: + if self._device is not None: + return self._device + if self._invocation_result is not None: + self._device = self._invocation_result.device + return self._device + else: + raise RuntimeError("Device not set") + + def to_device(self, device: Device, force_copy: bool = False) -> "Tensor": + if self.device == device and not force_copy: + return self + else: + with device: + from . import copy + + return copy(self, device=device) + + def assign(self, other: "Tensor"): + if other is self: + return + self._device = other._device + self._shape = other._shape + self._dtype = other._dtype + self._layout = other._layout + self._backend = other._backend + self._slice = other._slice + self._batch = other._batch + self._index_in_batch = other._index_in_batch + self._invocation_result = other._invocation_result + self._wraps_external_data = other._wraps_external_data + + @property + def data(self): + if not self._backend: + self.evaluate() + return self._backend + + @property + def ndim(self) -> int: + if self._backend is not None: + return self._backend.ndim() + elif self._slice is not None: + return self._slice.ndim + elif self._invocation_result is not None: + return self._invocation_result.ndim + elif self._batch is not None: + return self._batch.ndim + else: + raise RuntimeError("Cannot determine the number of dimensions of the tensor.") + + @property + def shape(self) -> Tuple[int, ...]: + if self._shape is None: + if self._invocation_result is not None: + self._shape = self._invocation_result.shape + elif self._slice: + self._shape = self._slice.shape + elif self._batch is not None: + self._shape = self._batch.shape[self._index_in_batch] + else: + self._shape = tuple(self._backend.shape()) + return self._shape + + @property + def dtype(self) -> DType: + if self._dtype is None: + if self._invocation_result is not None: + self._dtype = _dtype(self._invocation_result.dtype) + elif self._slice: + self._dtype = self._slice.dtype + elif self._batch is not None: + self._dtype = self._batch.dtype + else: + self._dtype = _dtype(self._backend.dtype) + return self._dtype + + @property + def layout(self) -> str: + if self._layout is None: + if self._invocation_result is not None: + self._layout = self._invocation_result.layout + elif self._slice: + self._layout = self._slice.layout + elif self._batch is not None: + self._layout = self._batch.layout + else: + self._layout = self._backend.layout() + # Use "" to indicate that the layout has been checked and is empty, but still return None + # to avoid situations where we return a string with a length that doesn't match the number + # of dimensions. + return self._layout or None + + @property + def size(self) -> int: + return _volume(self.shape) + + @property + def nbytes(self) -> int: + return self.size * self.dtype.bytes + + @property + def itemsize(self) -> int: + return self.dtype.bytes + + def item(self) -> Any: + if self.size != 1: + raise ValueError(f"Tensor has {self.size} elements, expected 1") + import numpy as np + + with _EvalContext.get(): + return np.array(self.cpu().evaluate()._backend).item() + + def evaluate(self): + if self._backend is None: + with _EvalContext.get() as ctx: + if self._slice: + self._backend = self._slice.evaluate()._backend + elif self._batch is not None: + t = self._batch._tensors[self._index_in_batch] + if t is self: + self._backend = self._batch.evaluate()._backend[self._index_in_batch] + else: + self._backend = t.evaluate()._backend + else: + assert self._invocation_result is not None + self._backend = self._invocation_result.value(ctx) + self._shape = tuple(self._backend.shape()) + self._dtype = DType.from_type_id(self._backend.dtype) + self._layout = self._backend.layout() + return self + + def __getitem__(self, ranges: Any) -> "Tensor": + if not isinstance(ranges, tuple): + ranges = (ranges,) + + if all(_is_full_slice(r) or r is Ellipsis for r in ranges): + return self + else: + if self._slice: + return self._slice.__getitem__(ranges) + else: + return Tensor(TensorSlice(self, ranges)) + + def _is_same_tensor(self, other: "Tensor") -> bool: + return ( + self._backend is other._backend + and self._invocation_result is other._invocation_result + and self._slice is other._slice + ) + + def __str__(self) -> str: + return "Tensor(\n" + str(self.evaluate()._backend) + ")" + + def __add__(self, other): + return _arithm_op("add", self, other) + + def __radd__(self, other): + return _arithm_op("add", other, self) + + def __sub__(self, other): + return _arithm_op("sub", self, other) + + def __rsub__(self, other): + return _arithm_op("sub", other, self) + + def __mul__(self, other): + return _arithm_op("mul", self, other) + + def __rmul__(self, other): + return _arithm_op("mul", other, self) + + def __pow__(self, other): + return _arithm_op("pow", self, other) + + def __rpow__(self, other): + return _arithm_op("pow", other, self) + + def __truediv__(self, other): + return _arithm_op("fdiv", self, other) + + def __rtruediv__(self, other): + return _arithm_op("fdiv", other, self) + + def __floordiv__(self, other): + return _arithm_op("div", self, other) + + def __rfloordiv__(self, other): + return _arithm_op("div", other, self) + + def __neg__(self): + return _arithm_op("minus", self) + + # Short-circuiting the execution, unary + is basically a no-op + def __pos__(self): + return self + + def __eq__(self, other): + return _arithm_op("eq", self, other) + + def __ne__(self, other): + return _arithm_op("neq", self, other) + + def __lt__(self, other): + return _arithm_op("lt", self, other) + + def __le__(self, other): + return _arithm_op("leq", self, other) + + def __gt__(self, other): + return _arithm_op("gt", self, other) + + def __ge__(self, other): + return _arithm_op("geq", self, other) + + def __and__(self, other): + return _arithm_op("bitand", self, other) + + def __rand__(self, other): + return _arithm_op("bitand", other, self) + + def __or__(self, other): + return _arithm_op("bitor", self, other) + + def __ror__(self, other): + return _arithm_op("bitor", other, self) + + def __xor__(self, other): + return _arithm_op("bitxor", self, other) + + def __rxor__(self, other): + return _arithm_op("bitxor", other, self) + + +def _arithm_op(name, *args, **kwargs): + argsstr = " ".join(f"&{i}" for i in range(len(args))) + from . import arithmetic_generic_op + + return arithmetic_generic_op(*args, expression_desc=f"{name}({argsstr})") + + +def _is_int_value(tested: Any, reference: int) -> bool: + return isinstance(tested, int) and tested == reference + + +def _is_full_slice(r: Any) -> bool: + if isinstance(r, slice): + return ( + (r.start is None or _is_int_value(r.start, 0)) + and (r.stop is None) + and (r.step is None or _is_int_value(r.step, 1)) + ) + else: + return False + + +def _is_index(r: Any) -> bool: + return not isinstance(r, slice) and r is not Ellipsis + + +def _clamp(value: int, lo: int, hi: int) -> int: + return max(lo, min(value, hi)) + + +def _scalar_value(value: Any) -> int: + if isinstance(value, int): + return value + elif isinstance(value, Tensor): + return value.item() + else: + raise ValueError(f"Unsupported type: {type(value)}") + + +class TensorSlice: + def __init__(self, tensor: Tensor, ranges: Tuple[Any, ...], absolute=False): + self._tensor = copy.copy(tensor) + self._ndim_dropped = 0 + self._shape = None + if absolute: + self._absolute_ranges = [copy.copy(r) for r in ranges] + self._ranges = self._insane_pythonic_ranges(self._absolute_ranges, tensor.shape) + else: + self._ranges = [copy.copy(r) for r in ranges] + self._absolute_ranges = None + self._layout = None + num_ranges = len(ranges) + ellipsis_found = False + for r in ranges: + if _is_index(r): + self._ndim_dropped += 1 + elif r is Ellipsis: + if ellipsis_found: + raise ValueError("Only one Ellipsis is allowed.") + num_ranges -= 1 + ellipsis_found = True + if num_ranges > tensor.ndim: + raise ValueError( + f"Number of ranges ({num_ranges}) " + f"is greater than the number of dimensions of the tensor ({tensor.ndim})" + ) + + @property + def ndim(self) -> int: + return self._tensor.ndim - self._ndim_dropped + + @property + def shape(self) -> Tuple[int, ...]: + if self._shape is None: + shape = [] + if self._absolute_ranges is None: + self._absolute_ranges = self._canonicalize_ranges(self._ranges, self._tensor.shape) + for r in self._absolute_ranges: + if isinstance(r, slice): + if r.step < 0: + shape.append((r.stop + r.step - r.start + 1) // r.step) + else: + shape.append((r.stop + r.step - r.start - 1) // r.step) + self._shape = tuple(shape) + return self._shape + + @property + def dtype(self) -> DType: + return self._tensor.dtype + + @property + def device(self) -> Device: + return self._tensor.device + + @property + def layout(self) -> str: + if self._layout is not None: + return self._layout + input_layout = self._tensor.layout + if self._ndim_dropped == 0 or input_layout == "" or input_layout is None: + self._layout = input_layout + return self._layout + + j = 0 + layout = "" + for i, r in enumerate(self._ranges): + if isinstance(r, slice): + layout += input_layout[j] + j += 1 + elif r is Ellipsis: + j += self._tensor.ndim - len(self._ranges) + 1 + else: + j += 1 # skip this dimension + self._layout = layout + return self._layout + + @staticmethod + def _canonicalize_ranges(ranges, in_shape) -> Tuple[int, ...]: + """Converts the ranges to sane non-pythonic values without negative indices wrapping""" + d = 0 + abs_ranges = [] + for i, r in enumerate(ranges): + if r is Ellipsis: + to_skip = len(in_shape) - len(ranges) + 1 + for _ in range(to_skip): + abs_ranges.append(slice(0, in_shape[d], 1)) + d += 1 + continue + if isinstance(r, slice): + step = _scalar_value(r.step) if r.step is not None else 1 + if step == 0: + raise ValueError("slice step cannot be zero") + extent = in_shape[d] + if r.start is not None: + start = _scalar_value(r.start) + if start < 0: + start += extent + else: + start = extent - 1 if step < 0 else 0 + if r.stop is not None: + stop = _scalar_value(r.stop) + if stop < 0: + stop += extent + else: + stop = -1 if step < 0 else extent + if step < 0: + stop = _clamp(stop, -1, extent - 1) + start = _clamp(start, stop, extent) + else: + start = _clamp(start, 0, extent) + stop = _clamp(stop, start, extent) + abs_ranges.append(slice(start, stop, step)) + else: + idx = _scalar_value(r) + if idx < 0: + idx += in_shape[d] + if idx < 0 or idx >= in_shape[d]: + raise IndexError( + f"Index {idx} is out of bounds for dimension {d} with size {in_shape[d]}" + ) + abs_ranges.append(idx) + d += 1 + while d < len(in_shape): + abs_ranges.append(slice(0, in_shape[d], 1)) + d += 1 + + return tuple(abs_ranges) + + @staticmethod + def _insane_pythonic_ranges(abs_ranges, shape) -> Tuple[int, ...]: + """Converts an absolute range into ranges as expected by Pythonic slicing API""" + py_ranges = [] + for r, s in zip(abs_ranges, shape): + if isinstance(r, slice): + stop = r.stop + # The exclusive `stop` for negative ranges could be -1, but it means + # something else in Python - so we need skip over the whole length of the + # array to make it really negative. + if r.step < 0: + if stop < 0: + stop -= s + py_ranges.append(slice(r.start, stop, r.step)) + else: + py_ranges.append(r) + return tuple(py_ranges) + + def __getitem__(self, ranges: Any) -> "Tensor": + if not isinstance(ranges, tuple): + ranges = (ranges,) + + if all(_is_full_slice(r) or r is Ellipsis for r in ranges): + return Tensor(self) + else: + ranges = self._canonicalize_ranges(ranges, self.shape) + abs_ranges = list(self._absolute_ranges) + i = 0 + for d, r in enumerate(self._absolute_ranges): + if isinstance(r, slice): + if isinstance(ranges[i], slice): + start = r.start + ranges[i].start * r.step + stop = r.start + ranges[i].stop * r.step + step = r.step * ranges[i].step + abs_ranges[d] = slice(start, stop, step) + else: + abs_ranges[d] = r.start + ranges[i] * r.step + i += 1 + result = TensorSlice(self._tensor, tuple(abs_ranges), True) + if _eval_mode.EvalMode.current().value >= _eval_mode.EvalMode.eager.value: + result.evaluate() + return Tensor(result) + + def evaluate(self): + with _EvalContext.get(): + if len(self._ranges) == 0: + return self._tensor.evaluate() + + if all(_is_full_slice(r) for r in self._ranges): + return self._tensor.evaluate() + + args = {} + d = 0 + for i, r in enumerate(self._ranges): + if r is Ellipsis: + d = self._tensor.ndim - len(self._ranges) + i + 1 + elif isinstance(r, slice): + if r.start is not None: + args[f"lo_{d}"] = r.start + if r.stop is not None and r.stop >= 0: + args[f"hi_{d}"] = r.stop + if r.step is not None: + args[f"step_{d}"] = r.step + d += 1 + else: + args[f"at_{d}"] = r + d += 1 + + from . import tensor_subscript + + return tensor_subscript(self._tensor, **args).evaluate() + + +def tensor( + data: Any, + dtype: Optional[Any] = None, + device: Optional[Device] = None, + layout: Optional[str] = None, +): + """Copies an existing tensor-like object into a DALI tensor. + + Parameters + ---------- + data : TensorLike, default: None + The data to construct the tensor from. It can be a tensor-like object, a (nested) list, + TensorCPU/TensorGPU or other supported type. + Supported types are: + - numpy arrays + - torch tensors + - types exposing __dlpack__ or __array__ interface + - existing Tensor objects + dtype : DType, default: None + The desired data type of the tensor. If not specified, the data type is inferred + from the input data. If specified, the input data is cast to the desired data type. + device : Device or str, optional, default: None + The device on which the tensor should reside (e.g., "cpu" or "gpu"). + If not specified, the device is inferred from the input data. + layout : str, optional, default: None + The layout string describing the dimensions of the tensor (e.g., "HWC"). + If not specified, the layout is inferred from the input data, if possible. + """ + return Tensor(data, dtype=dtype, device=device, layout=layout, copy=True) + + +def as_tensor( + data: Any, + dtype: Optional[Any] = None, + device: Optional[Device] = None, + layout: Optional[str] = None, +): + """Wraps an existing tensor-like object into a DALI tensor. + + Parameters + ---------- + data : TensorLike, default: None + The data to construct the tensor from. It can be a tensor-like object, a (nested) list, + TensorCPU/TensorGPU or other supported type. + Supported types are: + - numpy arrays + - torch tensors + - types exposing __dlpack__ or __array__ interface + - existing Tensor objects + dtype : DType, default: None + The desired data type of the tensor. If not specified, the data type is inferred + from the input data. If specified, the input data is cast to the desired data type. + device : Device or str, optional, default: None + The device on which the tensor should reside (e.g., "cpu" or "gpu"). + If not specified, the device is inferred from the input data. + layout : str, optional, default: None + The layout string describing the dimensions of the tensor (e.g., "HWC"). + If not specified, the layout is inferred from the input data, if possible. + """ + from . import _batch + + if isinstance(data, _batch.Batch): + data = data.evaluate()._backend.as_tensor() + + return Tensor(data, dtype=dtype, device=device, layout=layout, copy=False) + + +__all__ = ["Tensor", "tensor", "as_tensor"] diff --git a/dali/python/nvidia/dali/experimental/dali2/ops.py b/dali/python/nvidia/dali/experimental/dali2/ops.py new file mode 100644 index 0000000000..15b12135aa --- /dev/null +++ b/dali/python/nvidia/dali/experimental/dali2/ops.py @@ -0,0 +1,434 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import _device +from . import _invocation +from . import _eval_context +import nvidia.dali as dali +from typing import Optional +import nvidia.dali.backend_impl as _b +from ._tensor import Tensor +from ._batch import Batch + + +class Operator: + def __init__( + self, + max_batch_size, + name=None, + device="cpu", + num_inputs=None, + call_arg_names=None, + **kwargs, + ): + self._name = name + self._max_batch_size = max_batch_size + self._init_args = kwargs + self._num_inputs = num_inputs + self._call_arg_names = None if call_arg_names is None else tuple(call_arg_names) + self._api_type = None + + from ._device import device as _to_device + self._device = _to_device(device) + + self._input_meta = [] + self._arg_meta = {} + self._num_outputs = None + self._output_devices = None + self._op_inst = None + self._op_backend = None + self._op_spec = None + self._last_invocation = None + + @classmethod + def get( + cls, + max_batch_size: int, + name: Optional[str] = None, + device: Optional[_device.Device] = None, + num_inputs: Optional[int] = None, + call_arg_names: Optional[list[str]] = None, + **init_args, + ): + if device is None: + device = _device.Device.current() + if not isinstance(device, _device.Device): + raise TypeError("device must be a Device instance") + + def freeze_arg(arg): + if isinstance(arg, list): + return tuple(arg) + return arg + + def freeze_args(args): + sorted_keys = sorted(args.keys()) + return tuple([(k, freeze_arg(args[k])) for k in sorted_keys]) + + call_arg_names = freeze_arg(call_arg_names) + key = (device, max_batch_size, num_inputs, call_arg_names, freeze_args(init_args)) + inst = cls._instance_cache.get(key, None) + if inst is None: + with device: + inst = cls( + max_batch_size, + name=name, + device=device, + num_inputs=num_inputs, + call_arg_names=call_arg_names, + **init_args, + ) + cls._instance_cache[key] = inst + return inst + + def infer_num_outputs(self, *inputs, **args): + self._init_spec(inputs, args) + return self._num_outputs + + def input_device(self, index: int, actual_device: Optional[str] = None): + default_input_device = "gpu" if self._device.device_type == "gpu" else "cpu" + dev_type = self.schema.GetInputDevice(index, actual_device, default_input_device) + if dev_type is None: + return self._device + return _device.Device(dev_type, self._device.device_id) # inherit the device id + + def infer_output_devices(self, *inputs, **args): + self._init_spec(inputs, args) + return self._output_devices + + def _pre_call(self, *inputs, **args): + pass + + def _is_backend_initialized(self): + return self._op_backend is not None + + def _reset_backend(self): + self._op_backend = None + self._op_spec = None + + def _init_spec(self, inputs, args): + if self._op_spec is None: + self._num_inputs = len(inputs) + self._call_arg_names = tuple(args.keys()) + import nvidia.dali as dali + + with self._device: + input_nodes = [ + dali.data_node.DataNode( + name=f"input_{i}", device=inputs[i].device.device_type, source=None + ) + for i in range(len(inputs)) + ] + arg_nodes = { + name: dali.data_node.DataNode(name=f"arg_{name}", device="cpu", source=None) + for name in args + } + op = self.legacy_op( + name=self._name, device=self._device.device_type, **self._init_args + ) + self._op_inst = op + out = op(*input_nodes, **arg_nodes) + if isinstance(out, (list, tuple)): + spec = out[0].source.spec + else: + spec = out.source.spec + + self._op_spec = spec + + if isinstance(out, (tuple, list)): + self._output_devices = [] + self._num_outputs = len(out) + for o in out: + device_type = o.device + device_id = self._device.device_id + self._output_devices.append(_device.Device(device_type, device_id)) + else: + self._num_outputs = 1 + self._output_devices = [_device.Device(out.device, self._device.device_id)] + + self._set_meta(inputs, args) + + def _init_backend(self, ctx, inputs, args): + if self._op_backend is not None: + return + + if ctx is None: + ctx = _eval_context.EvalContext.get() + with self._device: + with ctx: + self._init_spec(inputs, args) + if ctx._thread_pool is not None: + self._op_spec.AddArg("num_threads", ctx._thread_pool.num_threads) + else: + self._op_spec.AddArg("num_threads", 1) + self._op_spec.AddArg( + "device_id", + ( + self._device.device_id + if self._device.device_type == "gpu" or self._device.device_type == "mixed" + else dali.types.CPU_ONLY_DEVICE_ID + ), + ) + if self._max_batch_size is None: + self._max_batch_size = 1 + self._op_spec.AddArg("max_batch_size", self._max_batch_size) + self._op_backend = _b._Operator(self._op_spec) + + def run(self, ctx, *inputs, batch_size=None, **args): + if ( + batch_size is not None + and self._max_batch_size is not None + and batch_size > self._max_batch_size + and self.schema.IsStateful() + ): + raise RuntimeError( + f"The batch size {batch_size} is larger than the `max_batch_size` " + f"{self._max_batch_size} specified when the operator was created." + ) + + def _is_batch(): + nonlocal inputs, args + for input in inputs: + if isinstance(input, ((_b.TensorListCPU, _b.TensorListGPU))): + return True + for input in args.values(): + if isinstance(input, ((_b.TensorListCPU, _b.TensorListGPU))): + return True + return False + + is_batch = batch_size is not None or _is_batch() + if self._is_backend_initialized(): + if self.schema.IsStateful(): + # clearing the backend in a stateful op would destroy the state + self.check_compatible(inputs, batch_size, args) + elif not self.is_compatible(inputs, batch_size, args): + # we can reinitialize a stateless operator - not very efficient :( + self._reset_backend() + + self._init_backend(ctx, inputs, args) + workspace = _b._Workspace(ctx._thread_pool, ctx._cuda_stream) + for i, input in enumerate(inputs): + workspace.AddInput(self._to_batch(input).evaluate()._backend) + for name, arg in args.items(): + workspace.AddArgumentInput(name, self._to_batch(arg).evaluate()._backend) + self._op_backend.SetupAndRun(workspace, batch_size) + out = workspace.GetOutputs() + if is_batch: + return tuple(out) + else: + tensors = tuple(o[0] for o in out) + return tensors + + def _to_batch(self, x): + if not isinstance(x, Batch): + return Batch([x]) + else: + return x + + def _set_meta(self, inputs, args): + self._input_meta = [self._make_meta(input) for input in inputs] + self._arg_meta = {name: self._make_meta(arg) for name, arg in args.items()} + + def is_compatible(self, inputs, batch_size, args): + if batch_size is not None: + if batch_size > self._max_batch_size: + return False + if self._input_meta != [self._make_meta(input) for input in inputs]: + return False + if self._arg_meta != {name: self._make_meta(arg) for name, arg in args.items()}: + return False + return True + + def check_compatible(self, inputs, batch_size, args): + def error_header(): + return ( + f"The invocation of operator {self.display_name} " + f"is not compatible with the previous call:\n" + ) + + if batch_size is not None: + if batch_size > self._max_batch_size: + raise RuntimeError( + f"{error_header()}" + f"The batch size {batch_size} is larger than the `max_batch_size` " + f"{self._max_batch_size} specified when the operator was created." + ) + + if len(inputs) != len(self._input_meta): + raise RuntimeError( + f"{error_header()}" + f"The number of inputs ({len(inputs)}) does not match the number " + f"of inputs used in the previous call ({len(self._input_meta)})." + ) + for i, input in enumerate(inputs): + if self._input_meta[i] != self._make_meta(input): + raise RuntimeError( + f"{error_header()}" + f"The input {i} is not compatible with the input used in the previous call." + ) + for name, arg in args.items(): + if name not in self._arg_meta: + raise RuntimeError( + f"{error_header()}" f"The argument `{name}` was not used in the previous call." + ) + if self._arg_meta[name] != self._make_meta(arg): + raise RuntimeError( + f"{error_header()}" + f"The argument `{name}` is not compatible with the argument used in the " + f"previous call." + ) + for name in self._arg_meta: + if name not in args: + raise RuntimeError( + f"{error_header()}" + f"The argument `{name}` used in the previous call was not supplied in the " + f"current one." + ) + + def _make_meta(self, x): + is_batch = False + if isinstance(x, _invocation.Invocation): + is_batch = x.is_batch + elif isinstance(x, Batch): + is_batch = True + else: + is_batch = False + + return { + "is_batch": is_batch, + "ndim": x.ndim, + "layout": x.layout, + "dtype": x.dtype, + } + + @property + def display_name(self): + if "display_name" in self._init_args: + type_name = self._init_args["display_name"] + else: + type_name = self.schema.OperatorName() + if self._name is not None: + return f'type_name "{self._name}"' + else: + return type_name + + +class Reader(Operator): + def __init__( + self, + batch_size=None, + name=None, + device="cpu", + num_inputs=None, + call_arg_names=None, + **kwargs, + ): + if name is None: + name = f"Reader_{id(self)}" + self._actual_batch_size = batch_size + self._batch_size = batch_size + super().__init__( + self._actual_batch_size, name, device, num_inputs, call_arg_names, **kwargs + ) + + def _pre_call(self, *inputs, **args): + if self._api_type is None: + self._api_type = "run" + elif self._api_type != "run": + raise RuntimeError( + "Cannot mix `samples`, `batches` and `run`/`__call__` on the same reader." + ) + + def run(self, ctx=None, *inputs, **args): + if self._api_type is None: + self._api_type = "run" + elif self._api_type != "run": + raise RuntimeError( + "Cannot mix `samples`, `batches` and `run`/`__call__` on the same reader." + ) + + return super().run(ctx, *inputs, **args) + + def samples(self, ctx: Optional[_eval_context.EvalContext] = None): + if self._api_type is None: + self._api_type = "samples" + elif self._api_type != "samples": + raise RuntimeError( + "Cannot mix `samples`, `batches` and `run`/`__call__` on the same reader." + ) + + if ctx is None: + ctx = _eval_context.EvalContext.get() + with ctx: + if not self._is_backend_initialized(): + if self._actual_batch_size is None: + self._actual_batch_size = 1 + if self._max_batch_size is None: + self._max_batch_size = self._actual_batch_size + self._init_backend(ctx, (), {}) + meta = self._op_backend.GetReaderMeta() + idx = 0 + while idx < meta["epoch_size_padded"]: + outputs = super().run(ctx, batch_size=self._actual_batch_size) + batch_size = len(outputs[0]) + assert batch_size == self._actual_batch_size + idx += batch_size + for x in zip(*outputs): + outs = tuple(Tensor(o) for o in x) + yield outs + + def batches(self, batch_size=None, ctx: Optional[_eval_context.EvalContext] = None): + if self._api_type is None: + self._api_type = "batches" + elif self._api_type != "batches": + raise RuntimeError("Cannot mix samples(), batches() and run() on the same reader.") + + if ctx is None: + ctx = _eval_context.EvalContext.get() + with ctx: + if batch_size is None: + batch_size = self._batch_size + if batch_size is None: + raise ValueError("Batch size was not specified") + if not self._op_backend: + if self._max_batch_size and self._max_batch_size < batch_size: + raise ValueError( + f"`batch_size` {batch_size} is larger than the `max_batch_size` " + f"{self._max_batch_size} specified when the operator was created" + ) + self._max_batch_size = batch_size + self._init_backend(ctx, (), {}) + else: + if self._max_batch_size and self._max_batch_size != batch_size: + raise ValueError( + f"`batch_size` {batch_size} is different than the `max_batch_size` " + f"{self._max_batch_size} used in the previous call" + ) + meta = self._op_backend.GetReaderMeta() + idx = 0 + while idx < meta["epoch_size_padded"]: + outputs = super().run(ctx, batch_size=batch_size) + batch_size_returned = len(outputs[0]) + assert batch_size_returned == batch_size + idx += batch_size_returned + yield tuple(Batch(o) for o in outputs) + + +_all_ops = [] + + +def _initialize(): + from . import _op_builder + + global _all_ops + _all_ops = _op_builder.build_operators() diff --git a/dali/test/python/experimental_mode/test_operator_device.py b/dali/test/python/experimental_mode/test_operator_device.py new file mode 100644 index 0000000000..4d1ac05cbd --- /dev/null +++ b/dali/test/python/experimental_mode/test_operator_device.py @@ -0,0 +1,410 @@ +# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import nvidia.dali.experimental.dali2 as dali2 +import nvidia.dali.backend as _backend +from nose_utils import SkipTest, assert_raises, attr +from test_utils import get_dali_extra_path +import numpy as np +import os + +test_data_root = get_dali_extra_path() + +def test_tensor_creation_with_device_string_gpu(): + t = dali2.Tensor(np.array([1, 2, 3]), device="gpu") + assert t.device.device_type == "gpu" + assert t.device.device_id == 0 + +def test_tensor_creation_with_device_gpu_object(): + t = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("gpu")) + assert t.device.device_type == "gpu" + assert t.device.device_id == 0 + +def test_tensor_creation_with_device_string_cpu(): + t = dali2.Tensor(np.array([1, 2, 3]), device="cpu") + assert t.device.device_type == "cpu" + +def test_tensor_creation_with_device_cpu_object(): + t = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("cpu")) + assert t.device.device_type == "cpu" + +def test_tensor_creation_with_device_gpu_object_variants(): + arr = np.array([1, 2, 3]) + t1 = dali2.Tensor(arr, device=dali2.Device("gpu:0")) + assert t1.device.device_type == "gpu" + assert t1.device.device_id == 0 + t2 = dali2.Tensor(arr, device=dali2.Device("gpu")) + assert t2.device.device_type == "gpu" + assert t2.device.device_id == 0 + t3 = dali2.Tensor(arr, device=dali2.Device("gpu", 0)) + assert t3.device.device_type == "gpu" + assert t3.device.device_id == 0 + + +def test_tensor_creation_with_device_cuda_object_variants(): + arr = np.array([1, 2, 3]) + t1 = dali2.Tensor(arr, device=dali2.Device("cuda:0")) + assert t1.device.device_type == "gpu" + assert t1.device.device_id == 0 + t2 = dali2.Tensor(arr, device=dali2.Device("cuda")) + assert t2.device.device_type == "gpu" + assert t2.device.device_id == 0 + t3 = dali2.Tensor(arr, device=dali2.Device("cuda", 0)) + assert t3.device.device_type == "gpu" + assert t3.device.device_id == 0 + + +def test_tensor_creation_with_device_cpu_object_variants(): + arr = np.array([1, 2, 3]) + t1 = dali2.Tensor(arr, device=dali2.Device("cpu")) + assert t1.device.device_type == "cpu" + + +def test_tensor_addition_with_unanimous_gpu_inputs(): + gpu_tensor1 = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("gpu")) + gpu_tensor2 = dali2.Tensor(np.array([4, 5, 6]), device=dali2.Device("gpu")) + result = gpu_tensor1 + gpu_tensor2 + assert result.device.device_type == "gpu" + + +def test_tensor_addition_with_unanimous_cpu_inputs(): + cpu_tensor1 = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("cpu")) + cpu_tensor2 = dali2.Tensor(np.array([4, 5, 6]), device=dali2.Device("cpu")) + result = cpu_tensor1 + cpu_tensor2 + assert result.device.device_type == "cpu" + + +def test_tensor_addition_with_mixed_cpu_gpu_inputs_raises_error(): + cpu_tensor = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("cpu")) + gpu_tensor = dali2.Tensor(np.array([4, 5, 6]), device=dali2.Device("gpu")) + # This should raise an error when device=None and inputs have different devices + with assert_raises(RuntimeError, glob="*not on the requested device*"): + cpu_tensor + gpu_tensor + + +def test_tensor_addition_with_unanimous_gpu_ordinals(): + for device_id in range(_backend.GetCUDADeviceCount()): + gpu_tensor1 = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device(f"gpu:{device_id}")) + gpu_tensor2 = dali2.Tensor(np.array([4, 5, 6]), device=dali2.Device(f"gpu:{device_id}")) + result = gpu_tensor1 + gpu_tensor2 + assert result.device.device_type == "gpu" + assert result.device.device_id == device_id + + +@attr("multi_gpu") +def test_tensor_addition_with_mixed_gpu_ordinals_raises_error(): + if _backend.GetCUDADeviceCount() < 2: + raise SkipTest("At least 2 devices needed for the test") + gpu_tensor1 = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("gpu:0")) + gpu_tensor2 = dali2.Tensor(np.array([4, 5, 6]), device=dali2.Device("gpu:1")) + with assert_raises(RuntimeError, glob="*incompatible device*"): + gpu_tensor1 + gpu_tensor2 + + +def test_slice_operator_with_gpu_inputs_infers_gpu_device(): + data = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("gpu:0")) + anchor = dali2.Tensor( + [ + 1, + ], + device=dali2.Device("gpu:0"), + ) + shape = dali2.Tensor( + [ + 2, + ], + device=dali2.Device("gpu:0"), + ) + result = dali2.slice(data, anchor, shape, axes=(0,)) + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + +def test_slice_operator_with_cpu_inputs_infers_cpu_device(): + data = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("cpu")) + anchor = dali2.Tensor( + [ + 1, + ], + device=dali2.Device("cpu"), + ) + shape = dali2.Tensor( + [ + 2, + ], + device=dali2.Device("cpu"), + ) + result = dali2.slice(data, anchor, shape, axes=(0,)) + assert result.device.device_type == "cpu" + + +def test_slice_operator_with_gpu_inputs_and_device_none_infers_gpu_device(): + data = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("gpu:0")) + anchor = dali2.Tensor( + [ + 1, + ], + device=dali2.Device("gpu:0"), + ) + shape = dali2.Tensor( + [ + 2, + ], + device=dali2.Device("gpu:0"), + ) + result = dali2.slice(data, anchor, shape, axes=(0,), device=None) + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + +def test_slice_operator_with_cpu_inputs_and_device_none_infers_cpu_device(): + data = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("cpu")) + anchor = dali2.Tensor( + [ + 1, + ], + device=dali2.Device("cpu"), + ) + shape = dali2.Tensor( + [ + 2, + ], + device=dali2.Device("cpu"), + ) + result = dali2.slice(data, anchor, shape, axes=(0,), device=None) + assert result.device.device_type == "cpu" + + +def test_slice_operator_with_cpu_inputs_and_gpu_device_raises_error(): + data = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("cpu")) + anchor = dali2.Tensor( + [ + 1, + ], + device=dali2.Device("cpu"), + ) + shape = dali2.Tensor( + [ + 2, + ], + device=dali2.Device("cpu"), + ) + with assert_raises(RuntimeError, glob="*incompatible device*"): + result = dali2.slice(data, anchor, shape, device=dali2.Device("gpu:0"), axes=(0,)) + + +def test_slice_operator_kwargs_must_be_cpu_tensors(): + data = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("gpu:0")) + end = dali2.Tensor( + [ + 2, + ], + device=dali2.Device("gpu:0"), + ) + with assert_raises(TypeError, glob="*'end'*"): + result = dali2.slice(data, device=dali2.Device("gpu:0"), end=end) + res_cpu = result.cpu() + + +def test_image_decoder_defaults_to_cpu_device(): + image_path = os.path.join(test_data_root, "db", "single", "jpeg", "100", "swan-3584559_640.jpg") + raw_bytes = np.fromfile(image_path, dtype=np.uint8) + result = dali2.decoders.image(raw_bytes) + assert result.device.device_type == "cpu" + + +def test_image_decoder_with_explicit_mixed_device(): + print("TODO(janton): to be removed") + image_path = os.path.join(test_data_root, "db", "single", "jpeg", "100", "swan-3584559_640.jpg") + raw_bytes = np.fromfile(image_path, dtype=np.uint8) + result = dali2.decoders.image(raw_bytes, device="mixed") + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + +def test_image_decoder_with_explicit_gpu_device(): + raise SkipTest("TODO(janton): Not yet supported") + image_path = os.path.join(test_data_root, "db", "single", "jpeg", "100", "swan-3584559_640.jpg") + raw_bytes = np.fromfile(image_path, dtype=np.uint8) + result = dali2.decoders.image(raw_bytes, device=dali2.Device("gpu:0")) + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + +def test_file_reader_defaults_to_cpu_device(): + reader = dali2.readers.File(file_root=os.path.join(test_data_root, "db", "single", "jpeg")) + count = 0 + for data, label in reader.samples(): + assert data.device.device_type == "cpu" + assert label.device.device_type == "cpu" + count += 1 + if count >= 5: + break + + +def test_file_reader_with_gpu_device_raises_error(): + with assert_raises(RuntimeError, glob="*not registered for gpu*"): + reader = dali2.readers.File( + file_root=os.path.join(test_data_root, "db", "single", "jpeg"), + device=dali2.Device("gpu:0"), + ) + for data, label in reader.samples(): + pass + + +def test_tensor_addition_with_cuda_string_variations(): + # Test 'cuda:0' vs 'gpu:0' equivalence + gpu_tensor1 = dali2.Tensor(np.array([1, 2, 3]), device=dali2.Device("cuda:0")) + gpu_tensor2 = dali2.Tensor(np.array([4, 5, 6]), device=dali2.Device("gpu:0")) + result1 = gpu_tensor1 + gpu_tensor2 + assert result1.device.device_type == "gpu" + assert result1.device.device_id == 0 + + +def test_uniform_operator_device_defaults_to_cpu(): + # Test that the uniform operator produces tensors on the correct device + uniform_op = dali2.ops.random.Uniform(max_batch_size=8) + result = uniform_op() + assert result.device.device_type == "cpu" + + +def test_uniform_operator_device_explicit_cpu(): + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="cpu") + result = uniform_op() + assert result.device.device_type == "cpu" + + +def test_uniform_operator_device_explicit_gpu(): + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="gpu") + result = uniform_op() + assert result.device.device_type == "gpu" + + +def test_uniform_operator_device_explicit_gpu_ordinal(): + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="gpu:0") + result = uniform_op() + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + if _backend.GetCUDADeviceCount() >= 2: + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="gpu:1") + result = uniform_op() + assert result.device.device_type == "gpu" + assert result.device.device_id == 1 + + +def test_uniform_operator_device_explicit_cuda_ordinal(): + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="cuda:0") + result = uniform_op() + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + if _backend.GetCUDADeviceCount() >= 2: + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="cuda:1") + result = uniform_op() + assert result.device.device_type == "gpu" + assert result.device.device_id == 1 + + +def test_video_reader_device_inference(): + video_path = os.path.join( + test_data_root, "db", "video", "sintel", "video_files", "sintel_trailer-720p_0.mp4" + ) + with assert_raises(RuntimeError, glob="*not registered for cpu*"): + video_reader = dali2.readers.Video(filenames=[video_path], sequence_length=10) + (result,) = next(video_reader.samples()) + + +def test_video_reader_device_explicit_gpu(): + video_path = os.path.join( + test_data_root, "db", "video", "sintel", "video_files", "sintel_trailer-720p_0.mp4" + ) + video_reader = dali2.readers.Video(filenames=[video_path], sequence_length=10, device="gpu") + (result,) = next(video_reader.samples()) + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + +def test_video_decoder_device_inference(): + video_path = os.path.join( + test_data_root, "db", "video", "sintel", "video_files", "sintel_trailer-720p_0.mp4" + ) + encoded_video = np.fromfile(video_path, dtype=np.uint8) + decoded = dali2.experimental.decoders.video(encoded_video) + assert decoded.device.device_type == "cpu" + + +def test_video_decoder_device_explicit_mixed(): + video_path = os.path.join( + test_data_root, "db", "video", "sintel", "video_files", "sintel_trailer-720p_0.mp4" + ) + encoded_video = np.fromfile(video_path, dtype=np.uint8) + decoded = dali2.experimental.decoders.video( + encoded_video, start_frame=0, sequence_length=10, device="mixed" + ) + assert decoded.device.device_type == "gpu" + assert decoded.device.device_id == 0 + + +def test_video_decoder_device_explicit_gpu(): + raise SkipTest("TODO(janton): Not yet supported") + video_path = os.path.join( + test_data_root, "db", "video", "sintel", "video_files", "sintel_trailer-720p_0.mp4" + ) + encoded_video = np.fromfile(video_path, dtype=np.uint8) + decoded = dali2.experimental.decoders.video(encoded_video, sequence_length=10, device="gpu") + assert decoded.device.device_type == "gpu" + assert decoded.device.device_id == 0 + + +@attr("pytorch") +def test_operator_with_torch_device_cpu(): + import torch + + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device=torch.device("cpu")) + result = uniform_op() + assert result.device.device_type == "cpu" + + +@attr("pytorch") +def test_operator_with_torch_device_gpu(): + import torch + + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device=torch.device("cuda")) + result = uniform_op() + assert result.device.device_type == "gpu" + assert result.device.device_id == 0 + + +@attr("pytorch") +def test_operator_with_torch_device_gpu_ordinal(): + import torch + + for device_id in range(_backend.GetCUDADeviceCount()): + uniform_op = dali2.ops.random.Uniform( + max_batch_size=8, device=torch.device(f"cuda:{device_id}") + ) + result = uniform_op() + assert result.device.device_type == "gpu" + assert result.device.device_id == device_id + + +def test_operator_invalid_device_string(): + with assert_raises(ValueError, glob="*Invalid device*"): + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="invalid") + + +def test_operator_invalid_device_string_ordinal(): + with assert_raises(ValueError, glob="*Invalid device*"): + uniform_op = dali2.ops.random.Uniform(max_batch_size=8, device="gpu:999")