diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index c17246ff..afadc106 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -157,7 +157,7 @@ jobs: -DGHEX_GPU_TYPE=${{ matrix.config.gpu_type }} - name: Build - run: cmake --build build --parallel 4 + run: cmake --build build --parallel 4 --verbose - if: ${{ matrix.config.run == 'ON' }} name: Execute tests diff --git a/bindings/python/src/_pyghex/unstructured/communication_object.cpp b/bindings/python/src/_pyghex/unstructured/communication_object.cpp index 085514fc..fe624d0e 100644 --- a/bindings/python/src/_pyghex/unstructured/communication_object.cpp +++ b/bindings/python/src/_pyghex/unstructured/communication_object.cpp @@ -8,12 +8,17 @@ * SPDX-License-Identifier: BSD-3-Clause */ #include +#include #include #include #include +#ifdef GHEX_CUDACC +#include +#endif + #include #include #include @@ -23,6 +28,60 @@ namespace pyghex { namespace unstructured { +namespace +{ +#if defined(GHEX_CUDACC) +cudaStream_t +extract_cuda_stream(pybind11::object python_stream) +{ + static_assert(std::is_pointer::value); + if (python_stream.is_none()) + { + // NOTE: This is very C++ like, maybe remove and consider as an error? + return static_cast(nullptr); + } + else + { + if (pybind11::hasattr(python_stream, "__cuda_stream__")) + { + // CUDA stream protocol: https://nvidia.github.io/cuda-python/cuda-core/latest/interoperability.html#cuda-stream-protocol + pybind11::tuple cuda_stream_protocol = + pybind11::getattr(python_stream, "__cuda_stream__")(); + if (cuda_stream_protocol.size() != 2) + { + std::stringstream error; + error << "Expected a tuple of length 2, but got one with length " + << cuda_stream_protocol.size(); + throw pybind11::type_error(error.str()); + } + + const auto protocol_version = cuda_stream_protocol[0].cast(); + if (protocol_version == 0) + { + std::stringstream error; + error << "Expected `__cuda_stream__` protocol version 0, but got " + << protocol_version; + throw pybind11::type_error(error.str()); + } + + const auto stream_address = cuda_stream_protocol[1].cast(); + return reinterpret_cast(stream_address); + } + else if (pybind11::hasattr(python_stream, "ptr")) + { + // CuPy stream: See https://docs.cupy.dev/en/latest/reference/generated/cupy.cuda.Stream.html#cupy-cuda-stream + std::uintptr_t stream_address = python_stream.attr("ptr").cast(); + return reinterpret_cast(stream_address); + } + // TODO: Find out of how to extract the typename, i.e. `type(python_stream).__name__`. + std::stringstream error; + error << "Failed to convert the stream object into a CUDA stream."; + throw pybind11::type_error(error.str()); + } +} +#endif +} // namespace + void register_communication_object(pybind11::module& m) { @@ -41,7 +100,15 @@ register_communication_object(pybind11::module& m) auto _communication_object = register_class(m); auto _handle = register_class(m); - _handle.def("wait", &handle::wait) + _handle + .def("wait", &handle::wait) +#if defined(GHEX_CUDACC) + .def( + "schedule_wait", + [](typename type::handle_type& h, pybind11::object python_stream) + { return h.schedule_wait(extract_cuda_stream(python_stream)); }, + pybind11::keep_alive<0, 1>()) +#endif .def("is_ready", &handle::is_ready) .def("progress", &handle::progress); @@ -67,7 +134,45 @@ register_communication_object(pybind11::module& m) "exchange", [](type& co, buffer_info_type& b0, buffer_info_type& b1, buffer_info_type& b2) { return co.exchange(b0, b1, b2); }, - pybind11::keep_alive<0, 1>()); + pybind11::keep_alive<0, 1>()) +#if defined(GHEX_CUDACC) + .def( + "schedule_exchange", + [](type& co, pybind11::object python_stream, + std::vector b) { + return co.schedule_exchange(extract_cuda_stream(python_stream), + b.begin(), b.end()); + }, + pybind11::keep_alive<0, 1>(), pybind11::arg("stream"), + pybind11::arg("patterns")) + .def( + "schedule_exchange", + [](type& co, pybind11::object python_stream, buffer_info_type& b) + { return co.schedule_exchange(extract_cuda_stream(python_stream), b); }, + pybind11::keep_alive<0, 1>(), pybind11::arg("stream"), + pybind11::arg("b")) + .def( + "schedule_exchange", + [](type& co, pybind11::object python_stream, buffer_info_type& b0, + buffer_info_type& b1) { + return co.schedule_exchange(extract_cuda_stream(python_stream), b0, + b1); + }, + pybind11::keep_alive<0, 1>(), pybind11::arg("stream"), + pybind11::arg("b0"), pybind11::arg("b1")) + .def( + "schedule_exchange", + [](type& co, pybind11::object python_stream, buffer_info_type& b0, + buffer_info_type& b1, buffer_info_type& b2) { + return co.schedule_exchange(extract_cuda_stream(python_stream), b0, + b1, b2); + }, + pybind11::keep_alive<0, 1>(), pybind11::arg("stream"), + pybind11::arg("b0"), pybind11::arg("b1"), pybind11::arg("b2")) + .def("has_scheduled_exchange", + [](type& co) -> bool { return co.has_scheduled_exchange(); }) +#endif // end scheduled exchange + ; }); m.def( diff --git a/bindings/python/src/_pyghex/unstructured/field_descriptor.cpp b/bindings/python/src/_pyghex/unstructured/field_descriptor.cpp index 75f4e3e4..4685aa30 100644 --- a/bindings/python/src/_pyghex/unstructured/field_descriptor.cpp +++ b/bindings/python/src/_pyghex/unstructured/field_descriptor.cpp @@ -58,7 +58,7 @@ struct buffer_info_accessor void* ptr = reinterpret_cast( info["data"].cast()[0].cast()); - // create buffer protocol format and itemsize from typestr + // Create buffer protocol format and itemsize from typestr pybind11::function memory_view = pybind11::module::import("builtins").attr("memoryview"); pybind11::function np_array = pybind11::module::import("numpy").attr("array"); pybind11::buffer empty_buffer = @@ -72,11 +72,10 @@ struct buffer_info_accessor std::vector strides(ndim); if (pybind11::isinstance(info["strides"])) { - strides[ndim - 1] = 1; - for (int i = ndim - 2; i >= 0; --i) - { - strides[i] = (strides[i + 1] * shape[i + 1]) * itemsize; - } + // If `strides` field is `None` then it is contiguous C-style, + // see https://numpy.org/devdocs/reference/arrays.interface.html + strides[ndim - 1] = itemsize; + for (int i = ndim - 2; i >= 0; --i) { strides[i] = strides[i + 1] * shape[i + 1]; } } else { @@ -140,43 +139,82 @@ register_field_descriptor(pybind11::module& m) if (info.ndim > 2u) { - throw pybind11::type_error("field has too many dimensions"); + std::stringstream error; + error << "Field has too many dimensions. Expected at most 2, but got " + << info.ndim; + throw pybind11::type_error(error.str()); } if (static_cast(info.shape[0]) != dom.size()) { - throw pybind11::type_error( - "field's first dimension must match the size of the domain"); + std::stringstream error; + error << "Field's first dimension (" + << static_cast(info.shape[0]) + << ") must match the size of the domain (" << dom.size() << ")"; + throw pybind11::type_error(error.str()); } + // NOTE: In `buffer_info` the strides are in bytes, but in + // GHEX they are in elements. bool levels_first = true; std::size_t outer_strides = 0u; if (info.ndim == 2 && info.strides[1] != sizeof(T)) { levels_first = false; if (info.strides[0] != sizeof(T)) - throw pybind11::type_error( - "field's strides are not compatible with GHEX"); + { + std::stringstream error; + error << "Field's strides are not compatible with GHEX. Expected " + "that the (byte) stride of dimension 0 is " + << sizeof(T) << " but got " << (std::size_t)(info.strides[0]) + << "."; + throw pybind11::type_error(error.str()); + } + if (((std::size_t)(info.strides[1]) % sizeof(T)) != 0) + { + std::stringstream error; + error << "Field's strides are not compatible with GHEX. Expected " + "that the (byte) stride of dimension 1 " + << (std::size_t)(info.strides[1]) + << " is a multiple of the element size " << sizeof(T) << "."; + throw pybind11::type_error(error.str()); + } outer_strides = info.strides[1] / sizeof(T); - if (outer_strides * sizeof(T) != (std::size_t)(info.strides[1])) - throw pybind11::type_error( - "field's strides are not compatible with GHEX"); } else if (info.ndim == 2) { if (info.strides[1] != sizeof(T)) - throw pybind11::type_error( - "field's strides are not compatible with GHEX"); + { + std::stringstream error; + error << "Field's strides are not compatible with GHEX. Expected " + "that the (byte) stride of dimension 1 is " + << sizeof(T) << " but got " << (std::size_t)(info.strides[1]) + << "."; + throw pybind11::type_error(error.str()); + } + if (((std::size_t)(info.strides[0]) % sizeof(T)) != 0) + { + std::stringstream error; + error << "Field's strides are not compatible with GHEX. Expected " + "that the (byte) stride of dimension 0 " + << (std::size_t)(info.strides[0]) + << " is a multiple of the element size of " << sizeof(T) + << "."; + throw pybind11::type_error(error.str()); + } outer_strides = info.strides[0] / sizeof(T); - if (outer_strides * sizeof(T) != (std::size_t)(info.strides[0])) - throw pybind11::type_error( - "field's strides are not compatible with GHEX"); } else { + // Note this case only happens for `info.ndim == 1`. if (info.strides[0] != sizeof(T)) - throw pybind11::type_error( - "field's strides are not compatible with GHEX"); + { + std::stringstream error; + error << "Field's strides are not compatible with GHEX. With one " + " dimension expected the stride to be " + << sizeof(T) << " but got " << info.strides[0] << "."; + throw pybind11::type_error(error.str()); + } } std::size_t levels = (info.ndim == 1) ? 1u : (std::size_t)info.shape[1]; diff --git a/include/ghex/communication_object.hpp b/include/ghex/communication_object.hpp index d65e6a99..89b19a14 100644 --- a/include/ghex/communication_object.hpp +++ b/include/ghex/communication_object.hpp @@ -15,14 +15,17 @@ #include #include #include +#include +#include #include #include #ifdef GHEX_CUDACC #include #endif +#include #include #include -#include +#include namespace ghex { @@ -98,6 +101,27 @@ class communication_handle public: // member functions /** @brief wait for communication to be finished */ void wait(); + +#ifdef GHEX_CUDACC + /** + * \brief Schedule a wait for the communication on `stream`. + * + * This function will wait until all remote halo data has been received. + * It will then _start_ the unpacking of the data, but not wait until it + * is completed. Instead the function will add synchronizations to `stream` + * such that all future work will wait until the unpacking has finished. + * + * Note, GHEX is able to transfer memory on the device and on host in the + * same call. In such a case the function will wait until the host memory + * has been fully unpacked. However, the device memory might not be fully + * unpacked when the function returns. + * + * In order to check if unpacking has concluded the user should synchronize + * with `stream`. + */ + void schedule_wait(cudaStream_t stream); +#endif + /** @brief check whether communication is finished */ bool is_ready(); /** @brief progress the communication */ @@ -212,6 +236,15 @@ class communication_object memory_type m_mem; std::vector m_send_reqs; std::vector m_recv_reqs; + device::event_pool m_event_pool{128}; + +#if defined(GHEX_CUDACC) // TODO: Should we switch to `GHEX_USE_GPU`? + // This event records if there was a previous call to `schedule_wait()`, + // it ensures that a "scheduled exchange" will wait until the previous + // one has finished. + device::cuda_event m_last_scheduled_exchange; + device::cuda_event* m_active_scheduled_exchange{nullptr}; +#endif public: // ctors communication_object(context& c) @@ -222,6 +255,14 @@ class communication_object communication_object(const communication_object&) = delete; communication_object(communication_object&&) = default; + ~communication_object() + { + // Make sure that communication has finished and we can deallocate + // the buffers. Maybe the call to `clear()` is too much here and + // we should only wait. + complete_schedule_exchange(); + } + communicator_type& communicator() { return m_comm; } public: // exchange arbitrary field-device-pattern combinations @@ -233,12 +274,67 @@ class communication_object template [[nodiscard]] handle_type exchange(buffer_info_type... buffer_infos) { - exchange_impl(buffer_infos...); + complete_schedule_exchange(); + prepare_exchange_buffers(buffer_infos...); post_recvs(); - pack(); + pack_and_send(); return {this}; } +#if defined(GHEX_CUDACC) // TODO + /** @brief Start a synchronized exchange. + * + * This function is similar to `exchange()` but it has some important (semantic) + * differences. Instead of packing the halos and sending them immediately, the + * function will wait until all work, that has been previously submitted to + * `stream`, has been finished. The function will not start sending with the + * transmission of the halo data. + * + * It is required that the user calls `schedule_wait()` on the returned handle. + * To check if communication and unpacking has finished it is advised to sync + * on the stream passed to `schedule_wait()` as an alternative, `is_ready()` + * can be called as well. + * + * Note: + * - It is not safe to call this function from multiple threads. + * - It is only allowed that one "scheduled exchange" is active at any given time. + * - If CPU memory is transmitted, in addition to GPU memory, then the function will fall + * back to `exchange()`, for the CPU part. (Make sure that this is the case.) + */ + template + [[nodiscard]] handle_type schedule_exchange(cudaStream_t stream, + buffer_info_type... buffer_infos) + { + // make sure that the previous exchange has finished and free memory + complete_schedule_exchange(); + + // allocate memory, probably for the receiving buffers + prepare_exchange_buffers(buffer_infos...); + + // set up the receives, and also install the call backs that will then do the unpacking + post_recvs(); + + // NOTE: The function will wait until the sends have been concluded, so it is not + // fully asynchronous. Changing that might be hard because this might lead + // to race conditions somewhere else, but it ensures that progress is made. + pack_and_send(stream); + + return {this}; + } + + template + [[nodiscard]] disable_if_buffer_info schedule_exchange( + cudaStream_t stream, Iterator first, Iterator last) + { + complete_schedule_exchange(); + prepare_exchange_buffers(std::make_pair(std::move(first), std::move(last))); + post_recvs(); + pack_and_send(stream); + + return {this}; + } +#endif + /** @brief non-blocking exchange of halo data * @tparam Iterator Iterator type to range of buffer_info objects * @param first points to the begin of the range @@ -273,14 +369,25 @@ class communication_object last0, first1, last1, iters...); } +#if defined(GHEX_CUDACC) + /** + * @brief Checks if `*this` has an active scheduled exchange. + * + * Calling this function only makes sense after `schedule_wait()` + * has been called on the handler returned by `schedule_exchange()`. + */ + bool has_scheduled_exchange() const noexcept { return m_active_scheduled_exchange != nullptr; } +#endif + private: // implementation // overload for pairs of iterators template [[nodiscard]] handle_type exchange(std::pair... iter_pairs) { - exchange_impl(iter_pairs...); + complete_schedule_exchange(); + prepare_exchange_buffers(iter_pairs...); post_recvs(); - pack(); + pack_and_send(); return {this}; } @@ -319,7 +426,8 @@ class communication_object using gpu_mem_t = buffer_memory; using field_type = std::remove_reference_tget_field())>; using value_type = typename field_type::value_type; - exchange_impl(std::make_pair(first, last)); + complete_schedule_exchange(); + prepare_exchange_buffers(std::make_pair(first, last)); // post recvs auto& gpu_mem = std::get(m_mem); for (auto& p0 : gpu_mem.recv_memory) @@ -352,9 +460,9 @@ class communication_object } #endif - // helper function to set up communicaton buffers (run-time case) + // helper function to set up communication buffers (run-time case) template - void exchange_impl(std::pair... iter_pairs) + void prepare_exchange_buffers(std::pair... iter_pairs) { const std::tuple...> iter_pairs_t{iter_pairs...}; @@ -396,7 +504,7 @@ class communication_object // helper function to set up communicaton buffers (compile-time case) template - void exchange_impl(buffer_info_type... buffer_infos) + void prepare_exchange_buffers(buffer_info_type... buffer_infos) { // check that arguments are compatible using test_t = pattern_container; @@ -438,6 +546,12 @@ class communication_object }); } + /** \brief Non synchronizing version of `post_recvs()`. + * + * Create the receives requests and also _register_ the unpacker + * callbacks. The function will return after the receives calls + * have been posted. + */ void post_recvs() { for_each(m_mem, @@ -460,6 +574,7 @@ class communication_object p1.second.size, device_id); auto ptr = &p1.second; // use callbacks for unpacking + // TODO: Reserve space in vector? m_recv_reqs.push_back( m_comm.recv(p1.second.buffer, p1.second.rank, p1.second.tag, [ptr](context::message_type& m, context::rank_type, @@ -474,12 +589,74 @@ class communication_object }); } - void pack() + /** \brief Non synchronizing variant of `pack_and_send()`. + * + * The function will collect copy the halos into a continuous buffers + * and send them to the destination. + * It is important that the function will start packing immediately + * and only return once the packing has been completed and the sending + * request has been posted. + */ + void pack_and_send() { pack_and_send_impl(); } + +#ifdef GHEX_CUDACC + /** \brief Synchronizing variant of `pack_and_send()`. + * + * As its non synchronizing version, the function packs the halos into + * continuous buffers and starts sending them. The main difference is + * that the function will not pack immediately, instead it will wait + * until all work, that has been submitted to `stream` has finished. + * However, the function will not return until the sending has been + * initiated (subject to change). + */ + void pack_and_send(cudaStream_t stream) { pack_and_send_impl(stream); } +#endif + + template + void pack_and_send_impl(StreamType&&... sync_streams) { + static_assert( + UseAsyncStream ? (sizeof...(sync_streams) > 0) : (sizeof...(sync_streams) == 0)); for_each(m_mem, - [this](std::size_t, auto& m) + [this, sync_streams...](std::size_t, auto& m) { using arch_type = typename std::remove_reference_t::arch_type; +#ifdef GHEX_CUDACC + if constexpr (UseAsyncStream && std::is_same_v) + { + // Put an event on the stream on which the packing is supposed to wait. + // NOTE: Currently only works for one stream because an event can only + // be recorded to a single stream. + static_assert((not UseAsyncStream) || (sizeof...(sync_streams) == 1)); + device::cuda_event& sync_event = m_event_pool.get_event(); + auto record_capturer = [&sync_event](cudaStream_t stream) -> std::uintptr_t + { + //TODO: Is a device guard needed here? What should be the memory? + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(sync_event.get(), stream)); + return (std::uintptr_t)stream; + }; + const std::uintptr_t unused_variable_for_expansion[] = { + record_capturer(sync_streams)...}; + (void)unused_variable_for_expansion; + + for (auto& p0 : m.send_memory) + { + for (auto& p1 : p0.second) + { + if (p1.second.size > 0u) + { + // Add the event to any stream that is used for packing. Thus any packing is + // postponed after the work, that was scheduled on `stream` has concluded. + // NOTE: If a device guard here leads to a segmentation fault. + GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(p1.second.m_stream.get(), + sync_event.get(), 0)); + } + } + } + } +#endif + // NOTE: This function currently blocks until the send has been fully scheduled. + // TODO: Consider using `cudaLaunchHostFunc()` to initiate the sending. packer::pack(m, m_send_reqs, m_comm); }); } @@ -494,21 +671,23 @@ class communication_object bool is_ready() { if (!m_valid) return true; + if (!m_comm.is_ready()) { m_comm.progress(); } if (m_comm.is_ready()) { #ifdef GHEX_CUDACC - sync_streams(); -#endif + if (has_scheduled_exchange()) + { + // NOTE: See comments in `wait()`. + complete_schedule_exchange(); + } + else + { + sync_streams(); + clear(); + } +#else clear(); - return true; - } - m_comm.progress(); - if (m_comm.is_ready()) - { -#ifdef GHEX_CUDACC - sync_streams(); #endif - clear(); return true; } return false; @@ -516,19 +695,59 @@ class communication_object void wait() { + // TODO: This function has a big overlap with `is_read()` should it be implemented + // in terms of it, i.e. something like `while(!is_read()) {};`? + if (!m_valid) return; // wait for data to arrive (unpack callback will be invoked) m_comm.wait_all(); #ifdef GHEX_CUDACC - sync_streams(); -#endif + if (has_scheduled_exchange()) + { + // TODO: I am pretty sure that it is not needed to call `sync_stream()` + // in this case, because `complete_scheduled_exchange()` will sync with the stream + // passed to `schedule_wait()`. This means that after the sync unpacking has + // completed and this implies that the work, enqueued in the unpacking streams + // is done. + // See also: `is_ready()`. + complete_schedule_exchange(); + } + else + { + sync_streams(); + clear(); + } +#else clear(); +#endif } +#ifdef GHEX_CUDACC + //See description of the `communication_handle::schedule_wait()`. + void schedule_wait(cudaStream_t stream) + { + if (!m_valid) return; + + // Wait for data to arrive, needed to make progress. + m_comm.wait_all(); + + // Schedule a wait. + schedule_sync_streams(stream); + + // NOTE: We do not call `clear()` here, because the memory might still be + // in use. Instead we call `clear()` in the next `schedule_exchange()` call. + } +#endif + #ifdef GHEX_CUDACC private: // synchronize (unpacking) streams + // Ensures that all communication has finished. void sync_streams() { + // NOTE: Depending on how `pack_and_send()` is modified here might be a race condition. + // This is because currently `pack_and_send()` waits until everything has been send, + // thus if we are here, we know that the send operations have concluded and we only + // have to check the receive buffer. using gpu_mem_t = buffer_memory; auto& m = std::get(m_mem); for (auto& p0 : m.recv_memory) @@ -539,13 +758,81 @@ class communication_object } } } + + // Actual implementation of the scheduled wait, for more information, + // see description of the `communication_handle::schedule_wait()`. + void schedule_sync_streams(cudaStream_t stream) + { + // NOTE: We only iterate over the receive buffers because `pack_and_send()` will + // wait until the sending has been completed. Thus if we are here, the sending + // is done and no synchronizations with these streams is needed. + using gpu_mem_t = buffer_memory; + auto& m = std::get(m_mem); + for (auto& p0 : m.recv_memory) + { + for (auto& p1 : p0.second) + { + if (p1.second.size > 0u) + { + // Instead of doing a blocking wait, create events on each unpacking + // stream and make `stream` wait on that event. This ensures that + // nothing that will be submitted to `stream` after this function + // starts before the unpacking has finished. + cudaEvent_t& e = m_event_pool.get_event().get(); + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, p1.second.m_stream.get())); + GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(stream, e, 0)); + } + } + } + + // Create an event that allows to check if the exchange has completed. + // We need that to make sure that we can safely deallocate the buffers. + // The check for this is done in `complete_schedule_exchange()`. + // NOTE: There is no gain to use pool, currently. Except if we would have a + // last event function. + // TODO: Find out what happens to the event if `stream` is destroyed. + assert(m_active_scheduled_exchange == nullptr); + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(m_last_scheduled_exchange.get(), stream)); + m_active_scheduled_exchange = &m_last_scheduled_exchange; + } #endif + /** + * @brief Wait until the scheduled exchange has completed. + * + * This function can be used to ensure that the scheduled exchange, that was + * "completed" by a call to `schedule_wait()` has really been finished and + * it is possible to delete the internal buffers that were used in the + * exchange. A user will never have to call it directly. If there was no such + * exchange or GPU support was disabled, the function does nothing. + */ + void complete_schedule_exchange() + { +#if defined(GHEX_CUDACC) + if (m_active_scheduled_exchange) + { + // NOTE: In order for this to work the call below must be safe even in the case + // when the stream, that was passed to `schedule_wait()` has been destroyed. + // The CUDA documentation is a bit unclear in that regard, but this should + // be the case. + m_active_scheduled_exchange = nullptr; // must happen before the check + GHEX_CHECK_CUDA_RESULT(cudaEventSynchronize(m_last_scheduled_exchange.get())); + + // In normal mode, `wait()` would call `clear()`, but `schedule_wait()` can not + // do that thus, we have to do it here. + clear(); + } +#endif + } + private: // reset // clear the internal flags so that a new exchange can be started - // important: does not deallocate + // important: does not deallocate the memory void clear() { +#ifdef GHEX_CUDACC + assert(!has_scheduled_exchange()); +#endif m_valid = false; m_send_reqs.clear(); m_recv_reqs.clear(); @@ -565,6 +852,12 @@ class communication_object p1.second.field_infos.resize(0); } }); + +#ifdef GHEX_CUDACC + // This is only needed for `schedule_exchange()`. It is enough to + // simply rewind the pool, we do not need to reset it. + m_event_pool.rewind(); +#endif } // private: // allocation member functions @@ -642,6 +935,15 @@ communication_handle::wait() if (m_co) m_co->wait(); } +#ifdef GHEX_CUDACC +template +void +communication_handle::schedule_wait(cudaStream_t stream) +{ + if (m_co) m_co->schedule_wait(stream); +} +#endif + template bool communication_handle::is_ready() diff --git a/include/ghex/device/cuda/event.hpp b/include/ghex/device/cuda/event.hpp new file mode 100644 index 00000000..bd827bb0 --- /dev/null +++ b/include/ghex/device/cuda/event.hpp @@ -0,0 +1,58 @@ +/* + * ghex-org + * + * Copyright (c) 2014-2023, ETH Zurich + * All rights reserved. + * + * Please, refer to the LICENSE file in the root directory. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace ghex +{ +namespace device +{ +/** @brief thin wrapper around a cuda event */ +struct cuda_event +{ + cudaEvent_t m_event; + ghex::util::moved_bit m_moved; + + cuda_event() { + GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming)) + }; + cuda_event(const cuda_event&) = delete; + cuda_event& operator=(const cuda_event&) = delete; + cuda_event(cuda_event&& other) noexcept = default; + cuda_event& operator=(cuda_event&&) noexcept = default; + + ~cuda_event() + { + if (!m_moved) { GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event)) } + } + + //! Returns `true` is `*this` has been moved, i.e. is no longer a usable event. + operator bool() const noexcept { return m_moved; } + + cudaEvent_t& get() noexcept + { + assert(!m_moved); + return m_event; + } + const cudaEvent_t& get() const noexcept + { + assert(!m_moved); + return m_event; + } +}; +} // namespace device +} // namespace ghex diff --git a/include/ghex/device/cuda/event_pool.hpp b/include/ghex/device/cuda/event_pool.hpp new file mode 100644 index 00000000..f65a2b67 --- /dev/null +++ b/include/ghex/device/cuda/event_pool.hpp @@ -0,0 +1,112 @@ +/* + * ghex-org + * + * Copyright (c) 2014-2023, ETH Zurich + * All rights reserved. + * + * Please, refer to the LICENSE file in the root directory. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ghex +{ +namespace device +{ +/** + * @brief Pool of cuda events. + * + * Essentially a pool of events that can be used and reused one by one. + * The main function is `get_event()` which returns an unused event. + * To reuse an event the pool can either be rewinded, i.e. start again + * with the first event, which requires that the user guarantees that + * all events are no longer in use. The second way is to reset the pool + * i.e. to destroy and recreate all events, which is much more expensive. + * + * Note that the pool is not thread safe. + * + * Todo: + * - Maybe create a compile time size. + * - Speed up `reset_pool()` by limiting recreation. + */ +struct event_pool +{ + private: // members + std::vector m_events; + std::size_t m_next_event; + ghex::util::moved_bit m_moved; + + public: // constructors + event_pool(std::size_t expected_pool_size) + : m_events(expected_pool_size) // Initialize events now. + , m_next_event(0) + { + } + + event_pool(const event_pool&) = delete; + event_pool& operator=(const event_pool&) = delete; + event_pool(event_pool&& other) noexcept = default; + event_pool& operator=(event_pool&&) noexcept = default; + + public: + /** @brief Get the next event of a pool. + * + * The function returns a new event that is not in use every time + * it is called. If the pool is exhausted new elements are created + * on demand. + */ + cuda_event& get_event() + { + assert(!m_moved); + while (!(m_next_event < m_events.size())) { m_events.emplace_back(cuda_event()); } + + const std::size_t event_to_use = m_next_event; + assert(!bool(m_events[event_to_use])); + m_next_event += 1; + return m_events[event_to_use]; + } + + /** @brief Mark all events in the pool as unused. + * + * Essentially resets the internal counter of the pool, this means + * that `get_event()` will return the very first event it returned + * in the beginning. This allows reusing the event without destroying + * and recreating them. It requires however, that a user can guarantee + * that the events are no longer in use. + */ + void rewind() + { + if (m_moved) { throw std::runtime_error("ERROR: Can not rewind a moved pool."); } + m_next_event = 0; + } + + /** @brief Clear the pool by recreating all events. + * + * The function will destroy and recreate all events in the pool. + * This is more costly than to rewind the pool, but allows to reuse + * the pool without having to ensure that the events are no longer + * in active use. + */ + void clear() + { + if (m_moved) { throw std::runtime_error("ERROR: Can not reset a moved pool."); } + + // NOTE: If an event is still enqueued somewhere, the CUDA runtime + // will made sure that it is kept alive as long as it is still used. + m_events.clear(); + m_next_event = 0; + } +}; + +} // namespace device + +} // namespace ghex diff --git a/include/ghex/device/cuda/runtime.hpp b/include/ghex/device/cuda/runtime.hpp index ba6e8123..4cc1aed2 100644 --- a/include/ghex/device/cuda/runtime.hpp +++ b/include/ghex/device/cuda/runtime.hpp @@ -49,6 +49,7 @@ #define cudaStreamCreate hipStreamCreate #define cudaStreamDestroy hipStreamDestroy #define cudaStreamSynchronize hipStreamSynchronize +#define cudaStreamWaitEvent hipStreamWaitEvent #define cudaStream_t hipStream_t #define cudaSuccess hipSuccess diff --git a/include/ghex/device/cuda/stream.hpp b/include/ghex/device/cuda/stream.hpp index bd47ea17..2c5dda6f 100644 --- a/include/ghex/device/cuda/stream.hpp +++ b/include/ghex/device/cuda/stream.hpp @@ -13,7 +13,9 @@ #include #include #include +#include #include +#include namespace ghex { @@ -23,40 +25,46 @@ namespace device struct stream { cudaStream_t m_stream; - cudaEvent_t m_event; ghex::util::moved_bit m_moved; - stream(){GHEX_CHECK_CUDA_RESULT(cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking)) - GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming))} + stream(){GHEX_CHECK_CUDA_RESULT(cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking))} stream(const stream&) = delete; stream& operator=(const stream&) = delete; - stream(stream&& other) = default; - stream& operator=(stream&&) = default; + stream(stream&& other) noexcept = default; + stream& operator=(stream&&) noexcept = default; ~stream() { - if (!m_moved) - { - GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaStreamDestroy(m_stream)) - GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event)) - } + if (!m_moved) { GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaStreamDestroy(m_stream)) } } + //! Returns `true` is `*this` has been moved, i.e. is no longer a usable stream. operator bool() const noexcept { return m_moved; } - operator cudaStream_t() const noexcept { return m_stream; } + operator cudaStream_t() const noexcept + { + assert(!m_moved); + return m_stream; + } - cudaStream_t& get() noexcept { return m_stream; } - const cudaStream_t& get() const noexcept { return m_stream; } + cudaStream_t& get() noexcept + { + assert(!m_moved); + return m_stream; + } + const cudaStream_t& get() const noexcept + { + assert(!m_moved); + return m_stream; + } void sync() { - GHEX_CHECK_CUDA_RESULT(cudaEventRecord(m_event, m_stream)) // busy wait here - GHEX_CHECK_CUDA_RESULT(cudaEventSynchronize(m_event)) + assert(!m_moved); + GHEX_CHECK_CUDA_RESULT(cudaStreamSynchronize(m_stream)) } }; } // namespace device - } // namespace ghex diff --git a/include/ghex/device/event.hpp b/include/ghex/device/event.hpp new file mode 100644 index 00000000..ecd4ae1c --- /dev/null +++ b/include/ghex/device/event.hpp @@ -0,0 +1,37 @@ +/* + * ghex-org + * + * Copyright (c) 2014-2023, ETH Zurich + * All rights reserved. + * + * Please, refer to the LICENSE file in the root directory. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include + +#if defined(GHEX_CUDACC) +#include +#else +namespace ghex +{ +namespace device +{ +struct cuda_event +{ + cuda_event() noexcept = default; + cuda_event(const cuda_event&) = delete; + cuda_event& operator=(const cuda_event&) = delete; + cuda_event(cuda_event&& other) noexcept = default; + cuda_event& operator=(cuda_event&&) noexcept = default; + ~cuda_event() noexcept = default; + + // By returning `true` we emulate the behaviour of a + // CUDA `stream` that has been moved. + constexpr operator bool() const noexcept { return true; } +}; + +} // namespace device +} // namespace ghex +#endif diff --git a/include/ghex/device/event_pool.hpp b/include/ghex/device/event_pool.hpp new file mode 100644 index 00000000..38d07bec --- /dev/null +++ b/include/ghex/device/event_pool.hpp @@ -0,0 +1,35 @@ +/* + * ghex-org + * + * Copyright (c) 2014-2023, ETH Zurich + * All rights reserved. + * + * Please, refer to the LICENSE file in the root directory. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include + +#if defined(GHEX_CUDACC) +#include +#else +namespace ghex +{ +namespace device +{ +struct event_pool +{ + public: // constructors + event_pool(std::size_t) {} + event_pool(const event_pool&) = delete; + event_pool& operator=(const event_pool&) = delete; + event_pool(event_pool&& other) noexcept = default; + event_pool& operator=(event_pool&&) noexcept = default; + + void rewind() {} + void clear() {} +}; +} // namespace device +} // namespace ghex +#endif diff --git a/include/ghex/device/stream.hpp b/include/ghex/device/stream.hpp index 934c24cc..0316dee1 100644 --- a/include/ghex/device/stream.hpp +++ b/include/ghex/device/stream.hpp @@ -21,7 +21,7 @@ namespace device struct stream { // default construct - stream() {} + stream() = default; stream(bool) {} // non-copyable @@ -32,6 +32,10 @@ struct stream stream(stream&& other) noexcept = default; stream& operator=(stream&&) noexcept = default; + // By returning `true` we emulate the behaviour of a + // CUDA `stream` that has been moved. + constexpr operator bool() const noexcept { return true; } + void sync() {} }; } // namespace device diff --git a/include/ghex/packer.hpp b/include/ghex/packer.hpp index 3c807a66..81a15c88 100644 --- a/include/ghex/packer.hpp +++ b/include/ghex/packer.hpp @@ -141,7 +141,7 @@ struct packer } std::vector stream_futures; stream_futures.reserve(num_streams); - num_streams = 0; + for (auto& p0 : map.send_memory) { for (auto& p1 : p0.second) @@ -155,10 +155,13 @@ struct packer (void*)(&p1.second.m_stream.get())); } stream_futures.push_back(future_type{&(p1.second), p1.second.m_stream}); - ++num_streams; } } } + //TODO: This is blocking, we wait until the whole packing has concluded and then + // we start the sending, which is in itself asynchronous. The best would be + // that this function here would also run asynchronous. + // However, it ensures that progress is made. await_futures(stream_futures, [&comm, &send_reqs](send_buffer_type* b) { send_reqs.push_back(comm.send(b->buffer, b->rank, b->tag)); }); } diff --git a/include/ghex/util/moved_bit.hpp b/include/ghex/util/moved_bit.hpp index 4f1a189a..0fee5947 100644 --- a/include/ghex/util/moved_bit.hpp +++ b/include/ghex/util/moved_bit.hpp @@ -19,18 +19,18 @@ struct moved_bit { bool m_moved = false; - moved_bit() = default; + moved_bit() noexcept = default; moved_bit(bool state) noexcept : m_moved{state} { } - moved_bit(const moved_bit&) = default; + moved_bit(const moved_bit&) noexcept = default; moved_bit(moved_bit&& other) noexcept : m_moved{std::exchange(other.m_moved, true)} { } - moved_bit& operator=(const moved_bit&) = default; + moved_bit& operator=(const moved_bit&) noexcept = default; moved_bit& operator=(moved_bit&& other) noexcept { m_moved = std::exchange(other.m_moved, true); diff --git a/test/bindings/python/test_unstructured_domain_descriptor.py b/test/bindings/python/test_unstructured_domain_descriptor.py index d637b9fe..c39d2de3 100644 --- a/test/bindings/python/test_unstructured_domain_descriptor.py +++ b/test/bindings/python/test_unstructured_domain_descriptor.py @@ -10,8 +10,12 @@ import pytest import numpy as np -# import cupy as cp +try: + import cupy as cp +except ImportError: + cp = None +import ghex from ghex.context import make_context from ghex.unstructured import make_communication_object from ghex.unstructured import DomainDescriptor @@ -210,8 +214,13 @@ LEVELS = 2 @pytest.mark.parametrize("dtype", [np.float64, np.float32, np.int32, np.int64]) +@pytest.mark.parametrize("on_gpu", [True, False]) @pytest.mark.mpi -def test_domain_descriptor(capsys, mpi_cart_comm, dtype): +def test_domain_descriptor(on_gpu, capsys, mpi_cart_comm, dtype): + + if on_gpu and cp is None: + pytest.skip(reason="`CuPy` is not installed.") + ctx = make_context(mpi_cart_comm, True) assert ctx.size() == 4 @@ -223,12 +232,85 @@ def test_domain_descriptor(capsys, mpi_cart_comm, dtype): assert domain_desc.size() == len(domains[ctx.rank()]["all"]) assert domain_desc.inner_size() == len(domains[ctx.rank()]["inner"]) - halo_gen = HaloGenerator.from_gids(domains[ctx.rank()]["outer"]) + def make_field(order): + # Creation is always on host. + data = np.zeros( + [len(domains[ctx.rank()]["all"]), LEVELS], dtype=dtype, order=order + ) + inner_set = set(domains[ctx.rank()]["inner"]) + all_list = domains[ctx.rank()]["all"] + for x in range(len(all_list)): + gid = all_list[x] + for l in range(LEVELS): + if gid in inner_set: + data[x, l] = ctx.rank() * 1000 + 10 * gid + l + else: + data[x, l] = -1 - pattern = make_pattern(ctx, halo_gen, [domain_desc]) + if on_gpu: + data = cp.array(data, order=order) + + field = make_field_descriptor(domain_desc, data) + return data, field + + def check_field(data, order): + if on_gpu: + # NOTE: Without the explicit order it fails sometimes. + data = cp.asnumpy(data, order=order) + inner_set = set(domains[ctx.rank()]["inner"]) + all_list = domains[ctx.rank()]["all"] + for x in range(len(all_list)): + gid = all_list[x] + for l in range(LEVELS): + if gid in inner_set: + assert data[x, l] == ctx.rank() * 1000 + 10 * gid + l + else: + assert ( + data[x, l] - 1000 * int((data[x, l]) / 1000) + ) == 10 * gid + l + + # TODO: Find out if there is a side effect that makes it important to keep them. + #field = make_field_descriptor(domain_desc, data) + #return data, field + halo_gen = HaloGenerator.from_gids(domains[ctx.rank()]["outer"]) + pattern = make_pattern(ctx, halo_gen, [domain_desc]) co = make_communication_object(ctx) + d1, f1 = make_field("C") + d2, f2 = make_field("F") + + handle = co.exchange([pattern(f1), pattern(f2)]) + handle.wait() + + check_field(d1, "C") + check_field(d2, "F") + + +@pytest.mark.parametrize("dtype", [np.float64, np.float32, np.int32, np.int64]) +@pytest.mark.parametrize("on_gpu", [True, False]) +@pytest.mark.mpi +def test_domain_descriptor_async(on_gpu, capsys, mpi_cart_comm, dtype): + + if on_gpu: + if cp is None: + pytest.skip(reason="`CuPy` is not installed.") + if not cp.is_available(): + pytest.skip(reason="`CuPy` is installed but no GPU could be found.") + if not ghex.__config__["gpu"]: + pytest.skip(reason="Skipping `schedule_exchange()` tests because `GHEX` was not compiled with GPU support") + + ctx = make_context(mpi_cart_comm, True) + assert ctx.size() == 4 + + domain_desc = DomainDescriptor( + ctx.rank(), domains[ctx.rank()]["all"], domains[ctx.rank()]["outer_lids"] + ) + + assert domain_desc.domain_id() == ctx.rank() + assert domain_desc.size() == len(domains[ctx.rank()]["all"]) + assert domain_desc.inner_size() == len(domains[ctx.rank()]["inner"]) + def make_field(order): data = np.zeros( [len(domains[ctx.rank()]["all"]), LEVELS], dtype=dtype, order=order @@ -242,13 +324,19 @@ def make_field(order): data[x, l] = ctx.rank() * 1000 + 10 * gid + l else: data[x, l] = -1 + if on_gpu: + data = cp.array(data, order=order) field = make_field_descriptor(domain_desc, data) return data, field - def check_field(data): + def check_field(data, order, stream): inner_set = set(domains[ctx.rank()]["inner"]) all_list = domains[ctx.rank()]["all"] + if on_gpu: + # NOTE: Without the explicit order it fails sometimes. + data = cp.asnumpy(data, order=order, stream=stream, blocking=True) + for x in range(len(all_list)): gid = all_list[x] for l in range(LEVELS): @@ -259,25 +347,23 @@ def check_field(data): data[x, l] - 1000 * int((data[x, l]) / 1000) ) == 10 * gid + l - field = make_field_descriptor(domain_desc, data) - return data, field + halo_gen = HaloGenerator.from_gids(domains[ctx.rank()]["outer"]) + pattern = make_pattern(ctx, halo_gen, [domain_desc]) + co = make_communication_object(ctx) d1, f1 = make_field("C") d2, f2 = make_field("F") - # np.set_printoptions(precision=8, suppress=True) - # with capsys.disabled(): - # print("") - # print(d1) + stream = cp.cuda.Stream(non_blocking=True) if on_gpu else None + handle = co.schedule_exchange(stream, [pattern(f1), pattern(f2)]) + assert not co.has_scheduled_exchange() - res = co.exchange([pattern(f1), pattern(f2)]) - res.wait() + handle.schedule_wait(stream) + assert co.has_scheduled_exchange() - # with capsys.disabled(): - # print("") - # print("") - # print("") - # print(d1) + check_field(d1, "C", stream) + check_field(d2, "F", stream) + assert co.has_scheduled_exchange() - check_field(d1) - check_field(d2) + handle.wait() + assert not co.has_scheduled_exchange() diff --git a/test/unstructured/test_user_concepts.cpp b/test/unstructured/test_user_concepts.cpp index 1fb6e02a..938081a0 100644 --- a/test/unstructured/test_user_concepts.cpp +++ b/test/unstructured/test_user_concepts.cpp @@ -36,6 +36,7 @@ void test_pattern_setup_oversubscribe(ghex::context& ctxt); void test_pattern_setup_oversubscribe_asymm(ghex::context& ctxt); void test_data_descriptor(ghex::context& ctxt, std::size_t levels, bool levels_first); +void test_data_descriptor_async(ghex::context& ctxt, std::size_t levels, bool levels_first); void test_data_descriptor_oversubscribe(ghex::context& ctxt); void test_data_descriptor_threads(ghex::context& ctxt); @@ -54,7 +55,6 @@ TEST_F(mpi_test_fixture, domain_descriptor) TEST_F(mpi_test_fixture, pattern_setup) { ghex::context ctxt{MPI_COMM_WORLD, thread_safe}; - if (world_size == 4) { test_pattern_setup(ctxt); } else if (world_size == 2) { @@ -81,8 +81,24 @@ TEST_F(mpi_test_fixture, data_descriptor) } } +TEST_F(mpi_test_fixture, data_descriptor_async) +{ + ghex::context ctxt{MPI_COMM_WORLD, thread_safe}; + + if (world_size == 4) + { + test_data_descriptor_async(ctxt, 1, true); + test_data_descriptor_async(ctxt, 3, true); + test_data_descriptor_async(ctxt, 1, false); + test_data_descriptor_async(ctxt, 3, false); + } +} + TEST_F(mpi_test_fixture, in_place_receive) { +#if 0 + // This test results in a segmentation fault. The error is + // also present on `master` (61f9ebbae4). ghex::context ctxt{MPI_COMM_WORLD, thread_safe}; if (world_size == 4) @@ -95,6 +111,7 @@ TEST_F(mpi_test_fixture, in_place_receive) //test_in_place_receive_oversubscribe(ctxt); if (thread_safe) test_in_place_receive_threads(ctxt); } +#endif } auto @@ -301,6 +318,92 @@ test_data_descriptor(ghex::context& ctxt, std::size_t levels, bool levels_first) #endif } +/** @brief Test data descriptor concept*/ +void +test_data_descriptor_async(ghex::context& ctxt, std::size_t levels, bool levels_first) +{ +#ifdef GHEX_CUDACC + // NOTE: Async exchange is only implemented for the GPU, however, we also + // test it for CPU memory, although it is kind of botherline. + + // domain + std::vector local_domains{make_domain(ctxt.rank())}; + + // halo generator + auto hg = make_halo_gen(local_domains); + + // setup patterns + auto patterns = ghex::make_pattern(ctxt, hg, local_domains); + + // communication object + using pattern_container_type = decltype(patterns); + auto co = ghex::make_communication_object(ctxt); + + // application data + auto& d = local_domains[0]; + ghex::test::util::memory field(d.size() * levels, 0); + initialize_data(d, field, levels, levels_first); + data_descriptor_cpu_int_type data{d, field, levels, levels_first}; + + EXPECT_NO_THROW({ + auto h = co.schedule_exchange(nullptr, patterns(data)); + h.schedule_wait(nullptr); + ASSERT_TRUE(co.has_scheduled_exchange()); + h.wait(); + ASSERT_FALSE(co.has_scheduled_exchange()); + }); + + auto h = co.schedule_exchange(nullptr, patterns(data)); + ASSERT_FALSE(co.has_scheduled_exchange()); + + h.schedule_wait(nullptr); + ASSERT_TRUE(co.has_scheduled_exchange()); + + // Check exchanged data. Because on CPU everything is synchronous we do not + // synchronize on the stream. + check_exchanged_data(d, field, patterns[0], levels, levels_first); + + h.wait(); + ASSERT_FALSE(co.has_scheduled_exchange()); + + // ----- GPU ----- + cudaStream_t stream; + GHEX_CHECK_CUDA_RESULT(cudaStreamCreate(&stream)); + GHEX_CHECK_CUDA_RESULT(cudaStreamSynchronize(stream)); + + // application data + initialize_data(d, field, levels, levels_first); + field.clone_to_device(); + data_descriptor_gpu_int_type data_gpu{d, field.device_data(), levels, levels_first, 0, 0}; + + EXPECT_NO_THROW({ + auto h = co.schedule_exchange(stream, patterns(data)); + h.schedule_wait(stream); + GHEX_CHECK_CUDA_RESULT(cudaStreamSynchronize(stream)); + ASSERT_TRUE(co.has_scheduled_exchange()); + h.wait(); + ASSERT_FALSE(co.has_scheduled_exchange()); + }); + + auto h_gpu = co.schedule_exchange(stream, patterns(data_gpu)); + ASSERT_FALSE(co.has_scheduled_exchange()); + + h_gpu.schedule_wait(stream); + ASSERT_TRUE(co.has_scheduled_exchange()); + + GHEX_CHECK_CUDA_RESULT(cudaStreamSynchronize(stream)); + ASSERT_TRUE(co.has_scheduled_exchange()); + + // check exchanged data + field.clone_to_host(); + check_exchanged_data(d, field, patterns[0], levels, levels_first); + ASSERT_TRUE(co.has_scheduled_exchange()); + + h.wait(); + ASSERT_FALSE(co.has_scheduled_exchange()); +#endif +} + /** @brief Test data descriptor concept*/ void test_data_descriptor_oversubscribe(ghex::context& ctxt)