Skip to content

Commit

Permalink
Add missing stream keyword argument to dpnp.ndarray.to_device met…
Browse files Browse the repository at this point in the history
…hod (#2263)

The PR proposes to align `dpnp.ndarray.to_device` method with python
array API and to add support of `stream` keyword argument.
The tests coverage is extended to cover new argument. Previously muted
array-api test is enabled back.

Note, tests for `dpnp.__dlpack__` are updated due to recent changes done
in [dpctl#1969](IntelPython/dpctl#1969).
  • Loading branch information
antonwolfy authored Jan 17, 2025
1 parent 952a798 commit 6cc2348
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 29 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/array-api-skips.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ array_api_tests/test_sorting_functions.py::test_sort
array_api_tests/test_signatures.py::test_func_signature[std]
array_api_tests/test_signatures.py::test_func_signature[var]

# missing 'stream' keyword argument
array_api_tests/test_signatures.py::test_array_method_signature[to_device]

# wrong shape is returned
array_api_tests/test_linalg.py::test_vecdot
array_api_tests/test_linalg.py::test_linalg_vecdot
Expand Down
49 changes: 42 additions & 7 deletions dpnp/dpnp_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,6 @@ def mT(self):

return dpnp_array._create_from_usm_ndarray(self._array_obj.mT)

def to_device(self, target_device):
"""Transfer array to target device."""

return dpnp_array(
shape=self.shape, buffer=self.get_array().to_device(target_device)
)

@property
def sycl_queue(self):
return self._array_obj.sycl_queue
Expand Down Expand Up @@ -1693,6 +1686,48 @@ def take(self, indices, axis=None, out=None, mode="wrap"):

return dpnp.take(self, indices, axis=axis, out=out, mode=mode)

def to_device(self, device, /, *, stream=None):
"""
Transfers this array to specified target device.
Parameters
----------
device : {string, SyclDevice, SyclQueue}
Array API concept of target device. It can be an OneAPI filter
selector string, an instance of :class:`dpctl.SyclDevice`
corresponding to a non-partitioned SYCL device, an instance of
:class:`dpctl.SyclQueue`, or a :class:`dpctl.tensor.Device` object
returned by :obj:`dpnp.dpnp_array.dpnp_array.device` property.
stream : {SyclQueue, None}, optional
Execution queue to synchronize with. If ``None``, synchronization
is not performed.
Default: ``None``.
Returns
-------
out : dpnp.ndarray
A view if data copy is not required, and a copy otherwise.
If copying is required, it is done by copying from the original
allocation device to the host, followed by copying from host
to the target device.
Examples
--------
>>> import dpnp as np, dpctl
>>> x = np.full(100, 2, dtype=np.int64)
>>> q_prof = dpctl.SyclQueue(x.sycl_device, property="enable_profiling")
>>> # return a view with profile-enabled queue
>>> y = x.to_device(q_prof)
>>> timer = dpctl.SyclTimer()
>>> with timer(q_prof):
... z = y * y
>>> print(timer.dt)
"""

usm_res = self._array_obj.to_device(device, stream=stream)
return dpnp_array._create_from_usm_ndarray(usm_res)

# 'tobytes',
# 'tofile',
# 'tolist',
Expand Down
14 changes: 12 additions & 2 deletions dpnp/tests/test_dlpack.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dpctl
import numpy
import pytest
from numpy.testing import assert_array_equal
from numpy.testing import assert_array_equal, assert_raises

import dpnp

Expand All @@ -10,11 +11,20 @@


class TestDLPack:
@pytest.mark.parametrize("stream", [None, 1])
@pytest.mark.parametrize("stream", [None, dpctl.SyclQueue()])
def test_stream(self, stream):
x = dpnp.arange(5)
x.__dlpack__(stream=stream)

@pytest.mark.parametrize(
"stream",
[1, dict(), dpctl.SyclDevice()],
ids=["scalar", "dictionary", "device"],
)
def test_invaid_stream(self, stream):
x = dpnp.arange(5)
assert_raises(TypeError, x.__dlpack__, stream=stream)

@pytest.mark.parametrize("copy", [True, None, False])
def test_copy(self, copy):
x = dpnp.arange(5)
Expand Down
75 changes: 59 additions & 16 deletions dpnp/tests/test_sycl_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -1932,23 +1932,66 @@ def test_svd(shape, full_matrices, compute_uv, device):
assert_sycl_queue_equal(dpnp_s_queue, expected_queue)


@pytest.mark.parametrize(
"device_from",
valid_devices,
ids=[device.filter_string for device in valid_devices],
)
@pytest.mark.parametrize(
"device_to",
valid_devices,
ids=[device.filter_string for device in valid_devices],
)
def test_to_device(device_from, device_to):
data = [1.0, 1.0, 1.0, 1.0, 1.0]

x = dpnp.array(data, dtype=dpnp.float32, device=device_from)
y = x.to_device(device_to)
class TestToDevice:
@pytest.mark.parametrize(
"device_from",
valid_devices,
ids=[device.filter_string for device in valid_devices],
)
@pytest.mark.parametrize(
"device_to",
valid_devices,
ids=[device.filter_string for device in valid_devices],
)
def test_basic(self, device_from, device_to):
data = [1.0, 1.0, 1.0, 1.0, 1.0]
x = dpnp.array(data, dtype=dpnp.float32, device=device_from)

y = x.to_device(device_to)
assert y.sycl_device == device_to
assert (x.asnumpy() == y.asnumpy()).all()

def test_to_queue(self):
x = dpnp.full(100, 2, dtype=dpnp.int64)
q_prof = dpctl.SyclQueue(x.sycl_device, property="enable_profiling")

y = x.to_device(q_prof)
assert (x.asnumpy() == y.asnumpy()).all()
assert_sycl_queue_equal(y.sycl_queue, q_prof)

def test_stream(self):
x = dpnp.full(100, 2, dtype=dpnp.int64)
q_prof = dpctl.SyclQueue(x.sycl_device, property="enable_profiling")
q_exec = dpctl.SyclQueue(x.sycl_device)

y = x.to_device(q_prof, stream=q_exec)
assert (x.asnumpy() == y.asnumpy()).all()
assert_sycl_queue_equal(y.sycl_queue, q_prof)

q_exec = dpctl.SyclQueue(x.sycl_device)
_ = dpnp.linspace(0, 20, num=10**5, sycl_queue=q_exec)
y = x.to_device(q_prof, stream=q_exec)
assert (x.asnumpy() == y.asnumpy()).all()
assert_sycl_queue_equal(y.sycl_queue, q_prof)

def test_stream_no_sync(self):
x = dpnp.full(100, 2, dtype=dpnp.int64)
q_prof = dpctl.SyclQueue(x.sycl_device, property="enable_profiling")

for stream in [None, x.sycl_queue]:
y = x.to_device(q_prof, stream=stream)
assert (x.asnumpy() == y.asnumpy()).all()
assert_sycl_queue_equal(y.sycl_queue, q_prof)

assert y.sycl_device == device_to
@pytest.mark.parametrize(
"stream",
[1, dict(), dpctl.SyclDevice()],
ids=["scalar", "dictionary", "device"],
)
def test_invalid_stream(self, stream):
x = dpnp.ones(2, dtype=dpnp.int64)
q_prof = dpctl.SyclQueue(x.sycl_device, property="enable_profiling")
assert_raises(TypeError, x.to_device, q_prof, stream=stream)


@pytest.mark.parametrize(
Expand Down
8 changes: 7 additions & 1 deletion dpnp/tests/third_party/cupy/core_tests/test_dlpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ def test_stream(self):
for src_s in [self._get_stream(s) for s in allowed_streams]:
for dst_s in [self._get_stream(s) for s in allowed_streams]:
orig_array = _gen_array(cupy.float32, alloc_q=src_s)
dltensor = orig_array.__dlpack__(stream=orig_array)

q = dpctl.SyclQueue(
orig_array.sycl_context,
orig_array.sycl_device,
property="enable_profiling",
)
dltensor = orig_array.__dlpack__(stream=q)

out_array = dlp.from_dlpack_capsule(dltensor)
out_array = cupy.from_dlpack(out_array, device=dst_s)
Expand Down

0 comments on commit 6cc2348

Please sign in to comment.