diff --git a/CMakeLists.txt b/CMakeLists.txt index dae28823..1f3ffa41 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,6 +52,8 @@ set(GHEX_ENABLE_ATLAS_BINDINGS OFF CACHE BOOL "Set to true to build with Atlas b set(GHEX_BUILD_FORTRAN OFF CACHE BOOL "True if FORTRAN bindings shall be built") set(GHEX_BUILD_PYTHON_BINDINGS OFF CACHE BOOL "Set to true to build Python bindings") set(GHEX_WITH_TESTING OFF CACHE BOOL "True if tests shall be built") +# TODO: Add FindNCCL.cmake module. +set(GHEX_USE_NCCL ON CACHE BOOL "Use NCCL") # --------------------------------------------------------------------- # Common includes diff --git a/cmake/config.hpp.in b/cmake/config.hpp.in index 69761668..9f370893 100644 --- a/cmake/config.hpp.in +++ b/cmake/config.hpp.in @@ -21,6 +21,7 @@ #cmakedefine GHEX_USE_XPMEM #cmakedefine GHEX_USE_XPMEM_ACCESS_GUARD #cmakedefine GHEX_USE_GPU +#cmakedefine GHEX_USE_NCCL #define GHEX_GPU_MODE @ghex_gpu_mode@ #cmakedefine GHEX_GPU_MODE_EMULATE #define @GHEX_DEVICE@ diff --git a/cmake/ghex_external_dependencies.cmake b/cmake/ghex_external_dependencies.cmake index 32c40fe4..6779e406 100644 --- a/cmake/ghex_external_dependencies.cmake +++ b/cmake/ghex_external_dependencies.cmake @@ -94,6 +94,15 @@ if (GHEX_USE_XPMEM) find_package(XPMEM REQUIRED) endif() + +# --------------------------------------------------------------------- +# nccl setup +# --------------------------------------------------------------------- +if(GHEX_USE_NCCL) + link_libraries("-lnccl") + # include_directories("") +endif() + # --------------------------------------------------------------------- # parmetis setup # --------------------------------------------------------------------- diff --git a/ext/oomph b/ext/oomph index 37db2ca5..7055358c 160000 --- a/ext/oomph +++ b/ext/oomph @@ -1 +1 @@ -Subproject commit 37db2ca5c7c11050b66fcd11c90e7436f0b8ff39 +Subproject commit 7055358ca2a9c136a61603448a7b93c49d54ee3e diff --git a/include/ghex/communication_object.hpp b/include/ghex/communication_object.hpp index d49cd1a4..6e0f4420 100644 --- a/include/ghex/communication_object.hpp +++ b/include/ghex/communication_object.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,10 @@ #include #include +#ifdef GHEX_USE_NCCL +#include +#endif + namespace ghex { // forward declaration for optimization on regular grids @@ -207,8 +212,12 @@ class communication_object using disable_if_buffer_info = std::enable_if_t::value, R>; private: // members + ghex::util::moved_bit m_moved; bool m_valid; communicator_type m_comm; +#ifdef GHEX_USE_NCCL + ncclComm_t m_nccl_comm; +#endif memory_type m_mem; std::vector m_send_reqs; std::vector m_recv_reqs; @@ -218,12 +227,45 @@ class communication_object : m_valid(false) , m_comm(c.transport_context()->get_communicator()) { + ncclUniqueId id; + if (m_comm.rank() == 0) { + ncclGetUniqueId(&id); + } + MPI_Comm mpi_comm = m_comm.mpi_comm(); + + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, mpi_comm); + + GHEX_CHECK_NCCL_RESULT(ncclCommInitRank(&m_nccl_comm, m_comm.size(), id, m_comm.rank())); + ncclResult_t state; + do { + GHEX_CHECK_NCCL_RESULT(ncclCommGetAsyncError(m_nccl_comm, &state)); + } while(state == ncclInProgress); + } + ~communication_object() noexcept { + if (!m_moved) { + GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaDeviceSynchronize()); + GHEX_CHECK_NCCL_RESULT_NO_THROW(ncclCommDestroy(m_nccl_comm)); + } } communication_object(const communication_object&) = delete; communication_object(communication_object&&) = default; communicator_type& communicator() { return m_comm; } + private: + template + void nccl_exchange_impl(buffer_info_type... buffer_infos) { + pack_nccl(); + + ncclGroupStart(); + post_sends_nccl(); + post_recvs_nccl(); + ncclGroupEnd(); + + unpack_nccl(); + } + + public: // exchange arbitrary field-device-pattern combinations /** @brief non-blocking exchange of halo data * @tparam Archs list of device types @@ -234,8 +276,12 @@ class communication_object [[nodiscard]] handle_type exchange(buffer_info_type... buffer_infos) { exchange_impl(buffer_infos...); +#ifdef GHEX_USE_NCCL + nccl_exchange_impl(); +#else post_recvs(); pack(); +#endif return {this}; } @@ -248,7 +294,6 @@ class communication_object [[nodiscard]] disable_if_buffer_info exchange( Iterator first, Iterator last) { - // call special function for a single range return exchange_u(first, last); } @@ -279,8 +324,12 @@ class communication_object [[nodiscard]] handle_type exchange(std::pair... iter_pairs) { exchange_impl(iter_pairs...); +#ifdef GHEX_USE_NCCL + nccl_exchange_impl(); +#else post_recvs(); pack(); +#endif return {this}; } @@ -462,6 +511,89 @@ class communication_object }); } + void post_sends_nccl() + { + for_each(m_mem, [this](std::size_t, auto& map) { + for (auto& p0 : map.send_memory) + { + const auto device_id = p0.first; + for (auto& p1 : p0.second) + { + if (p1.second.size > 0u) + { + device::guard g(p1.second.buffer); + // TODO: Check why element size isn't relevant for the + // buffer size (also for recv). + GHEX_CHECK_NCCL_RESULT( + ncclSend(static_cast(g.data()), p1.second.buffer.size(), + ncclChar, p1.second.rank, m_nccl_comm, p1.second.m_stream.get())); + } + } + } + }); + } + + void post_recvs_nccl() + { + for_each(m_mem, [this](std::size_t, auto& m) { + using arch_type = typename std::remove_reference_t::arch_type; + for (auto& p0 : m.recv_memory) + { + const auto device_id = p0.first; + for (auto& p1 : p0.second) + { + if (p1.second.size > 0u) + { + if (!p1.second.buffer || p1.second.buffer.size() != p1.second.size +#if defined(GHEX_USE_GPU) || defined(GHEX_GPU_MODE_EMULATE) + || p1.second.buffer.device_id() != device_id +#endif + ) + p1.second.buffer = arch_traits::make_message( + m_comm, p1.second.size, device_id); + GHEX_CHECK_NCCL_RESULT( + ncclRecv(p1.second.buffer.device_data(), p1.second.buffer.size(), + ncclChar, p1.second.rank, m_nccl_comm, p1.second.m_stream.get())); + } + } + } + }); + } + + void pack_nccl() + { + for_each(m_mem, [this](std::size_t, auto& m) { + using arch_type = typename std::remove_reference_t::arch_type; + packer::pack2_nccl(m, m_send_reqs, m_comm); + }); + } + + void unpack_nccl() + { + for_each(m_mem, [this](std::size_t, auto& m) { + using arch_type = typename std::remove_reference_t::arch_type; + for (auto& p0 : m.recv_memory) + { + const auto device_id = p0.first; + for (auto& p1 : p0.second) + { + if (p1.second.size > 0u) + { + if (!p1.second.buffer || p1.second.buffer.size() != p1.second.size +#if defined(GHEX_USE_GPU) || defined(GHEX_GPU_MODE_EMULATE) + || p1.second.buffer.device_id() != device_id +#endif + ) + p1.second.buffer = arch_traits::make_message( + m_comm, p1.second.size, device_id); + device::guard g(p1.second.buffer); + packer::unpack(p1.second, g.data()); + } + } + } + }); + } + void pack() { for_each(m_mem, [this](std::size_t, auto& m) { @@ -473,12 +605,19 @@ class communication_object private: // wait functions void progress() { +#ifdef GHEX_USE_NCCL + // TODO: No progress needed? +#else if (!m_valid) return; m_comm.progress(); +#endif } bool is_ready() { +#ifdef GHEX_USE_NCCL + // TODO: Check if streams are idle? +#else if (!m_valid) return true; if (m_comm.is_ready()) { @@ -497,14 +636,17 @@ class communication_object clear(); return true; } +#endif return false; } void wait() { +#ifndef GHEX_USE_NCCL if (!m_valid) return; // wait for data to arrive (unpack callback will be invoked) m_comm.wait_all(); +#endif #ifdef GHEX_CUDACC sync_streams(); #endif @@ -515,6 +657,10 @@ class communication_object private: // synchronize (unpacking) streams void sync_streams() { + constexpr std::size_t num_events{128}; + static std::vector events(num_events); + static std::size_t event_index{0}; + using gpu_mem_t = buffer_memory; auto& m = std::get(m_mem); for (auto& p0 : m.recv_memory) @@ -523,7 +669,18 @@ class communication_object { if (p1.second.size > 0u) { +#ifdef GHEX_USE_NCCL + // Instead of doing a blocking wait, create events on each + // stream that the default stream waits for. This assumes + // that all kernels that need the unpacked data will use or + // synchronize with the default stream. + cudaEvent_t& e = events[event_index].get(); + event_index = (event_index + 1) % num_events; + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, p1.second.m_stream.get())); + GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(0, e)); +#else p1.second.m_stream.sync(); +#endif } } } diff --git a/include/ghex/device/cuda/stream.hpp b/include/ghex/device/cuda/stream.hpp index 5aa75ef0..eb5ea37a 100644 --- a/include/ghex/device/cuda/stream.hpp +++ b/include/ghex/device/cuda/stream.hpp @@ -19,17 +19,41 @@ namespace ghex { namespace device { +struct cuda_event { + cudaEvent_t m_event; + ghex::util::moved_bit m_moved; + + cuda_event() { + GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming)) + } + cuda_event(const cuda_event&) = delete; + cuda_event& operator=(const cuda_event&) = delete; + cuda_event(cuda_event&& other) = default; + cuda_event& operator=(cuda_event&&) = default; + + ~cuda_event() + { + if (!m_moved) + { + GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event)) + } + } + + operator bool() const noexcept { return m_moved; } + operator cudaEvent_t() const noexcept { return m_event; } + cudaEvent_t& get() noexcept { return m_event; } + const cudaEvent_t& get() const noexcept { return m_event; } +}; + /** @brief thin wrapper around a cuda stream */ struct stream { cudaStream_t m_stream; - cudaEvent_t m_event; ghex::util::moved_bit m_moved; stream() { GHEX_CHECK_CUDA_RESULT(cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking)) - GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming)) } stream(const stream&) = delete; @@ -42,7 +66,6 @@ struct stream if (!m_moved) { GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaStreamDestroy(m_stream)) - GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event)) } } @@ -55,9 +78,8 @@ struct stream void sync() { - GHEX_CHECK_CUDA_RESULT(cudaEventRecord(m_event, m_stream)) // busy wait here - GHEX_CHECK_CUDA_RESULT(cudaEventSynchronize(m_event)) + GHEX_CHECK_CUDA_RESULT(cudaStreamSynchronize(m_stream)) } }; } // namespace device diff --git a/include/ghex/packer.hpp b/include/ghex/packer.hpp index a1475ad7..b70409ef 100644 --- a/include/ghex/packer.hpp +++ b/include/ghex/packer.hpp @@ -20,6 +20,19 @@ #include #endif +#ifdef GHEX_USE_NCCL +#include + +#define GHEX_CHECK_NCCL_RESULT(x) \ + if (x != ncclSuccess && x != ncclInProgress) \ + throw std::runtime_error(std::string("nccl call failed (") + std::to_string(x) + "):" + ncclGetErrorString(x)); +#define GHEX_CHECK_NCCL_RESULT_NO_THROW(x) \ + if (x != ncclSuccess && x != ncclInProgress) { \ + std::cerr << "nccl call failed (" << std::to_string(x) << "): " << ncclGetErrorString(x) << '\n'; \ + std::terminate(); \ + } +#endif + #include namespace ghex @@ -51,6 +64,15 @@ struct packer } } + template + static void pack2(Map& map, Requests& send_reqs, Communicator& comm) + { + pack(map, send_reqs, comm); + } + + template + static void pack2_nccl(Map&, Requests&, Communicator&) {} + template static void unpack(Buffer& buffer, unsigned char* data) { @@ -163,6 +185,95 @@ struct packer }); } + template + static void pack2_nccl(Map& map, Requests&, Communicator& comm) + { +#if 0 + constexpr std::size_t num_extra_streams{32}; + static std::vector streams(num_extra_streams); + static std::size_t stream_index{0}; +#endif + + constexpr std::size_t num_events{128}; + static std::vector events(num_events); + static std::size_t event_index{0}; + + // Assume that send memory synchronizes with the default + // stream so schedule pack kernels after an event on the + // default stream. + cudaEvent_t& e = events[event_index].get(); + event_index = (event_index + 1) % num_events; + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, 0)); + for (auto& p0 : map.send_memory) + { + const auto device_id = p0.first; + for (auto& p1 : p0.second) + { + if (p1.second.size > 0u) + { + if (!p1.second.buffer || p1.second.buffer.size() != p1.second.size || + p1.second.buffer.device_id() != device_id) + { + // std::cerr << "pack2_nccl: making message\n"; + p1.second.buffer = + arch_traits::make_message(comm, p1.second.size, device_id); + } + + device::guard g(p1.second.buffer); +#if 0 + int count = 0; +#endif + // Make sure stream used for packing synchronizes with the + // default stream. + GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(p1.second.m_stream.get(), e)); + for (const auto& fb : p1.second.field_infos) + { + // TODO: + // 1. launch pack kernels on separate streams for all data + // 1. (alternative) pack them all into the same kernel + // 2. trigger the send from a cuda host function + // 3. don't wait for futures here, but mixed with polling mpi for receives +#if 0 + if (count == 0) { +#endif + // std::cerr << "pack2_nccl: calling pack call_back\n"; + fb.call_back(g.data() + fb.offset, *fb.index_container, (void*)(&p1.second.m_stream.get())); +#if 0 + } else { + cudaStream_t& s = streams[stream_index].get(); + stream_index = (stream_index + 1) % num_extra_streams; + + cudaEvent_t& e = events[event_index].get(); + event_index = (event_index + 1) % num_events; + + fb.call_back(g.data() + fb.offset, *fb.index_container, (void*)(&s)); + + // Use the main stream only to synchronize. Launch + // the work on a separate stream and insert an event + // to allow waiting for all work on the main stream. + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, s)); + GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(p1.second.m_stream.get(), e)); + } + ++count; +#endif + } + + // Warning: tag is not used. Messages have to be correctly ordered. + // This is just for debugging, don't do mpi and nccl send + // std::cerr << "pack2_nccl: triggering mpi_isend\n"; + // comm.send(p1.second.buffer, p1.second.rank, p1.second.tag); + // std::cerr << "pack2_nccl: triggering ncclSend\n"; + // std::cerr << "pack2_nccl: ptr is " << static_cast(p1.second.buffer.device_data()) << "\n"; + // std::cerr << "pack2_nccl: g.data() is " << static_cast(g.data()) << "\n"; + // std::cerr << "pack2_nccl: size is " << p1.second.buffer.size() << "\n"; + // std::cerr << "pack2_nccl: ptr on device " << p1.second.buffer.on_device() << "\n"; + // GHEX_CHECK_NCCL_RESULT(ncclSend(static_cast(g.data()), p1.second.buffer.size() /* * sizeof(typename decltype(p1.second.buffer)::value_type) */, ncclChar, p1.second.rank, nccl_comm, p1.second.m_stream.get())); + // std::cerr << "pack2_nccl: triggered ncclSend\n"; + } + } + } + } + template static void unpack(Buffer& buffer, unsigned char* data) { diff --git a/include/ghex/unstructured/user_concepts.hpp b/include/ghex/unstructured/user_concepts.hpp index 280872a2..01214880 100644 --- a/include/ghex/unstructured/user_concepts.hpp +++ b/include/ghex/unstructured/user_concepts.hpp @@ -454,39 +454,71 @@ class data_descriptor #ifdef GHEX_CUDACC -#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK 32 +#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X 32 +#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y 1 template __global__ void -pack_kernel(const T* values, const std::size_t local_indices_size, +pack_kernel_levels_first(const T* values, const std::size_t local_indices_size, const std::size_t* local_indices, const std::size_t levels, T* buffer, - const std::size_t index_stride, const std::size_t level_stride, - const std::size_t buffer_index_stride, const std::size_t buffer_level_stride) + const std::size_t index_stride, const std::size_t buffer_index_stride) +{ + const std::size_t level = threadIdx.x + (blockIdx.x * blockDim.x); + const std::size_t idx = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) + { + auto const local_index = local_indices[idx]; + buffer[idx * buffer_index_stride + level] = values[local_index * index_stride + level]; + } +} + +template +__global__ void +pack_kernel_levels_last(const T* values, const std::size_t local_indices_size, + const std::size_t* local_indices, const std::size_t levels, T* buffer, + const std::size_t level_stride, const std::size_t buffer_level_stride) { const std::size_t idx = threadIdx.x + (blockIdx.x * blockDim.x); - if (idx < local_indices_size) + const std::size_t level = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) { - for (std::size_t level = 0; level < levels; ++level) - { - buffer[idx * buffer_index_stride + level * buffer_level_stride] = values[local_indices[idx] * index_stride + level * level_stride]; - } + auto const local_index = local_indices[idx]; + buffer[idx + level * buffer_level_stride] = values[local_index + level * level_stride]; } } template __global__ void -unpack_kernel(const T* buffer, const std::size_t local_indices_size, +unpack_kernel_levels_first(const T* buffer, const std::size_t local_indices_size, const std::size_t* local_indices, const std::size_t levels, T* values, - const std::size_t index_stride, const std::size_t level_stride, - const std::size_t buffer_index_stride, const std::size_t buffer_level_stride) + + const std::size_t index_stride, const std::size_t buffer_index_stride) +{ + const std::size_t level = threadIdx.x + (blockIdx.x * blockDim.x); + const std::size_t idx = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) + { + auto const local_index = local_indices[idx]; + values[local_index * index_stride + level] = buffer[idx * buffer_index_stride + level]; + } +} + +template +__global__ void +unpack_kernel_levels_last(const T* buffer, const std::size_t local_indices_size, + const std::size_t* local_indices, const std::size_t levels, T* values, + const std::size_t level_stride, const std::size_t buffer_level_stride) { const std::size_t idx = threadIdx.x + (blockIdx.x * blockDim.x); - if (idx < local_indices_size) + const std::size_t level = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) { - for (std::size_t level = 0; level < levels; ++level) - { - values[local_indices[idx] * index_stride + level * level_stride] = buffer[idx * buffer_index_stride + level * buffer_level_stride]; - } + auto const local_index = local_indices[idx]; + values[local_index + level * level_stride] = buffer[idx + level * buffer_level_stride]; } } @@ -522,7 +554,8 @@ class data_descriptor * @param outer_stride outer dimension's stride measured in number of elements of type T (special value 0: no padding) * @param device_id device id*/ data_descriptor(const domain_descriptor_type& domain, value_type* field, - std::size_t levels = 1u, bool levels_first = true, std::size_t outer_stride = 0u, device_id_type device_id = arch_traits::current_id()) + std::size_t levels = 1u, bool levels_first = true, std::size_t outer_stride = 0u, + device_id_type device_id = arch_traits::current_id()) : m_device_id{device_id} , m_domain_id{domain.domain_id()} , m_domain_size{domain.size()} @@ -549,34 +582,86 @@ class data_descriptor template void pack(value_type* buffer, const IndexContainer& c, void* stream_ptr) { + const dim3 threads_per_block(GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X, + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y); + for (const auto& is : c) { - const int n_blocks = - static_cast(std::ceil(static_cast(is.local_indices().size()) / - GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK)); - const std::size_t buffer_index_stride = m_levels_first ? m_levels : 1u; - const std::size_t buffer_level_stride = m_levels_first ? 1u : is.local_indices().size(); - pack_kernel<<(stream_ptr))>>>(m_values, - is.local_indices().size(), is.local_indices().data(), m_levels, buffer, - m_index_stride, m_level_stride, buffer_index_stride, buffer_level_stride); + if (m_levels_first) + { + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_levels, blocks_indices); + + pack_kernel_levels_first<<(stream_ptr))>>>(m_values, + is.local_indices().size(), is.local_indices().data(), m_levels, buffer, + m_index_stride, m_levels); + } + else + { + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_indices, blocks_levels); + + pack_kernel_levels_last<<(stream_ptr))>>>(m_values, + is.local_indices().size(), is.local_indices().data(), m_levels, buffer, + m_level_stride, is.local_indices().size()); + } } } template void unpack(const value_type* buffer, const IndexContainer& c, void* stream_ptr) { + const dim3 threads_per_block(GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X, + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y); + for (const auto& is : c) { - const int n_blocks = - static_cast(std::ceil(static_cast(is.local_indices().size()) / - GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK)); - const std::size_t buffer_index_stride = m_levels_first ? m_levels : 1u; - const std::size_t buffer_level_stride = m_levels_first ? 1u : is.local_indices().size(); - unpack_kernel<<(stream_ptr))>>>(buffer, - is.local_indices().size(), is.local_indices().data(), m_levels, m_values, - m_index_stride, m_level_stride, buffer_index_stride, buffer_level_stride); + if (m_levels_first) + { + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_levels, blocks_indices); + + unpack_kernel_levels_first<<(stream_ptr))>>>(buffer, + is.local_indices().size(), is.local_indices().data(), m_levels, m_values, + m_index_stride, m_levels); + } + else + { + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_indices, blocks_levels); + + unpack_kernel_levels_last<<(stream_ptr))>>>(buffer, + is.local_indices().size(), is.local_indices().data(), m_levels, m_values, + m_level_stride, is.local_indices().size()); + } } } }; diff --git a/test/unstructured/test_user_concepts.cpp b/test/unstructured/test_user_concepts.cpp index 35e4d0a3..31ce2f28 100644 --- a/test/unstructured/test_user_concepts.cpp +++ b/test/unstructured/test_user_concepts.cpp @@ -273,6 +273,7 @@ test_data_descriptor(ghex::context& ctxt, std::size_t levels, bool levels_first) // application data auto& d = local_domains[0]; ghex::test::util::memory field(d.size()*levels, 0); +#ifndef GHEX_USE_NCCL initialize_data(d, field, levels, levels_first); data_descriptor_cpu_int_type data{d, field, levels, levels_first}; @@ -283,6 +284,7 @@ test_data_descriptor(ghex::context& ctxt, std::size_t levels, bool levels_first) // check exchanged data check_exchanged_data(d, field, patterns[0], levels, levels_first); +#endif #ifdef GHEX_CUDACC // application data @@ -293,6 +295,9 @@ test_data_descriptor(ghex::context& ctxt, std::size_t levels, bool levels_first) EXPECT_NO_THROW(co.exchange(patterns(data_gpu)).wait()); auto h_gpu = co.exchange(patterns(data_gpu)); +#ifdef GHEX_USE_NCCL + cudaDeviceSynchronize(); +#endif h_gpu.wait(); // check exchanged data