From a4f5499503b41c90797c16f367aef805c8b2fa7c Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 22 Oct 2025 12:14:15 +0200 Subject: [PATCH 1/5] Update oomph for hwmalloc heap-config branch --- ext/oomph | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/oomph b/ext/oomph index 37db2ca5..d53e1ede 160000 --- a/ext/oomph +++ b/ext/oomph @@ -1 +1 @@ -Subproject commit 37db2ca5c7c11050b66fcd11c90e7436f0b8ff39 +Subproject commit d53e1edef937b9a6a102dbced958edd382461954 From 4746b35f73507f05a8fbc3b4011a2d8b7f346ac3 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Fri, 24 Oct 2025 11:46:11 +0200 Subject: [PATCH 2/5] Update oomph submodules --- ext/oomph | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/oomph b/ext/oomph index d53e1ede..7055358c 160000 --- a/ext/oomph +++ b/ext/oomph @@ -1 +1 @@ -Subproject commit d53e1edef937b9a6a102dbced958edd382461954 +Subproject commit 7055358ca2a9c136a61603448a7b93c49d54ee3e From d26c05ae640190f46e309cd916eeb72f5398a263 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Fri, 24 Oct 2025 14:28:35 +0200 Subject: [PATCH 3/5] Refactor pack/unpack kernels --- include/ghex/unstructured/user_concepts.hpp | 157 +++++++++++++++----- 1 file changed, 121 insertions(+), 36 deletions(-) diff --git a/include/ghex/unstructured/user_concepts.hpp b/include/ghex/unstructured/user_concepts.hpp index 280872a2..66becac5 100644 --- a/include/ghex/unstructured/user_concepts.hpp +++ b/include/ghex/unstructured/user_concepts.hpp @@ -454,39 +454,71 @@ class data_descriptor #ifdef GHEX_CUDACC -#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK 32 +#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X 32 +#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y 8 template __global__ void -pack_kernel(const T* values, const std::size_t local_indices_size, +pack_kernel_levels_first(const T* values, const std::size_t local_indices_size, const std::size_t* local_indices, const std::size_t levels, T* buffer, - const std::size_t index_stride, const std::size_t level_stride, - const std::size_t buffer_index_stride, const std::size_t buffer_level_stride) + const std::size_t index_stride, const std::size_t buffer_index_stride) +{ + const std::size_t level = threadIdx.x + (blockIdx.x * blockDim.x); + const std::size_t idx = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) + { + auto const local_index = local_indices[idx]; + buffer[idx * buffer_index_stride + level] = values[local_index * index_stride + level]; + } +} + +template +__global__ void +pack_kernel_levels_last(const T* values, const std::size_t local_indices_size, + const std::size_t* local_indices, const std::size_t levels, T* buffer, + const std::size_t level_stride, const std::size_t buffer_level_stride) { const std::size_t idx = threadIdx.x + (blockIdx.x * blockDim.x); - if (idx < local_indices_size) + const std::size_t level = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) { - for (std::size_t level = 0; level < levels; ++level) - { - buffer[idx * buffer_index_stride + level * buffer_level_stride] = values[local_indices[idx] * index_stride + level * level_stride]; - } + auto const local_index = local_indices[idx]; + buffer[idx + level * buffer_level_stride] = values[local_index + level * level_stride]; } } template __global__ void -unpack_kernel(const T* buffer, const std::size_t local_indices_size, +unpack_kernel_levels_first(const T* buffer, const std::size_t local_indices_size, const std::size_t* local_indices, const std::size_t levels, T* values, - const std::size_t index_stride, const std::size_t level_stride, - const std::size_t buffer_index_stride, const std::size_t buffer_level_stride) + + const std::size_t index_stride, const std::size_t buffer_index_stride) +{ + const std::size_t level = threadIdx.x + (blockIdx.x * blockDim.x); + const std::size_t idx = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) + { + auto const local_index = local_indices[idx]; + values[local_index * index_stride + level] = buffer[idx * buffer_index_stride + level]; + } +} + +template +__global__ void +unpack_kernel_levels_last(const T* buffer, const std::size_t local_indices_size, + const std::size_t* local_indices, const std::size_t levels, T* values, + const std::size_t level_stride, const std::size_t buffer_level_stride) { const std::size_t idx = threadIdx.x + (blockIdx.x * blockDim.x); - if (idx < local_indices_size) + const std::size_t level = threadIdx.y + (blockIdx.y * blockDim.y); + + if (idx < local_indices_size && level < levels) { - for (std::size_t level = 0; level < levels; ++level) - { - values[local_indices[idx] * index_stride + level * level_stride] = buffer[idx * buffer_index_stride + level * buffer_level_stride]; - } + auto const local_index = local_indices[idx]; + values[local_index + level * level_stride] = buffer[idx + level * buffer_level_stride]; } } @@ -522,7 +554,8 @@ class data_descriptor * @param outer_stride outer dimension's stride measured in number of elements of type T (special value 0: no padding) * @param device_id device id*/ data_descriptor(const domain_descriptor_type& domain, value_type* field, - std::size_t levels = 1u, bool levels_first = true, std::size_t outer_stride = 0u, device_id_type device_id = arch_traits::current_id()) + std::size_t levels = 1u, bool levels_first = true, std::size_t outer_stride = 0u, + device_id_type device_id = arch_traits::current_id()) : m_device_id{device_id} , m_domain_id{domain.domain_id()} , m_domain_size{domain.size()} @@ -549,34 +582,86 @@ class data_descriptor template void pack(value_type* buffer, const IndexContainer& c, void* stream_ptr) { + const dim3 threads_per_block(GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X, + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y); + for (const auto& is : c) { - const int n_blocks = - static_cast(std::ceil(static_cast(is.local_indices().size()) / - GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK)); - const std::size_t buffer_index_stride = m_levels_first ? m_levels : 1u; - const std::size_t buffer_level_stride = m_levels_first ? 1u : is.local_indices().size(); - pack_kernel<<(stream_ptr))>>>(m_values, - is.local_indices().size(), is.local_indices().data(), m_levels, buffer, - m_index_stride, m_level_stride, buffer_index_stride, buffer_level_stride); + if (m_levels_first) + { + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_levels, blocks_indices); + + pack_kernel_levels_first<<(stream_ptr))>>>(m_values, + is.local_indices().size(), is.local_indices().data(), m_levels, buffer, + m_index_stride, m_levels); + } + else + { + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_indices, blocks_levels); + + pack_kernel_levels_last<<(stream_ptr))>>>(m_values, + is.local_indices().size(), is.local_indices().data(), m_levels, buffer, + m_level_stride, is.local_indices().size()); + } } } template void unpack(const value_type* buffer, const IndexContainer& c, void* stream_ptr) { + const dim3 threads_per_block(GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X, + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y); + for (const auto& is : c) { - const int n_blocks = - static_cast(std::ceil(static_cast(is.local_indices().size()) / - GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK)); - const std::size_t buffer_index_stride = m_levels_first ? m_levels : 1u; - const std::size_t buffer_level_stride = m_levels_first ? 1u : is.local_indices().size(); - unpack_kernel<<(stream_ptr))>>>(buffer, - is.local_indices().size(), is.local_indices().data(), m_levels, m_values, - m_index_stride, m_level_stride, buffer_index_stride, buffer_level_stride); + if (m_levels_first) + { + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_levels, blocks_indices); + + unpack_kernel_levels_first<<(stream_ptr))>>>(buffer, + is.local_indices().size(), is.local_indices().data(), m_levels, m_values, + m_index_stride, m_levels); + } + else + { + const int blocks_indices = static_cast( + std::ceil(static_cast(is.local_indices().size()) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X)); + const int blocks_levels = static_cast( + std::ceil(static_cast(m_levels) / + GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y)); + + const dim3 blocks(blocks_indices, blocks_levels); + + unpack_kernel_levels_last<<(stream_ptr))>>>(buffer, + is.local_indices().size(), is.local_indices().data(), m_levels, m_values, + m_level_stride, is.local_indices().size()); + } } } }; From 1add6ed78bda8f1da37168125c54519e7fa80327 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Mon, 3 Nov 2025 12:48:07 +0100 Subject: [PATCH 4/5] Don't wait for streams to finish unpacking --- include/ghex/communication_object.hpp | 14 +++++++++++- include/ghex/device/cuda/stream.hpp | 32 ++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/include/ghex/communication_object.hpp b/include/ghex/communication_object.hpp index d49cd1a4..99e08ed3 100644 --- a/include/ghex/communication_object.hpp +++ b/include/ghex/communication_object.hpp @@ -515,6 +515,10 @@ class communication_object private: // synchronize (unpacking) streams void sync_streams() { + constexpr std::size_t num_events{128}; + static std::vector events(num_events); + static std::size_t event_index{0}; + using gpu_mem_t = buffer_memory; auto& m = std::get(m_mem); for (auto& p0 : m.recv_memory) @@ -523,7 +527,15 @@ class communication_object { if (p1.second.size > 0u) { - p1.second.m_stream.sync(); + // p1.second.m_stream.sync(); + // Instead of doing a blocking wait, create events on each + // stream that the default stream waits for. This assumes + // that all kernels that need the unpacked data will use or + // synchronize with the default stream. + cudaEvent_t& e = events[event_index].get(); + event_index = (event_index + 1) % num_events; + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, p1.second.m_stream.get())); + GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(0, e)); } } } diff --git a/include/ghex/device/cuda/stream.hpp b/include/ghex/device/cuda/stream.hpp index 5aa75ef0..eb5ea37a 100644 --- a/include/ghex/device/cuda/stream.hpp +++ b/include/ghex/device/cuda/stream.hpp @@ -19,17 +19,41 @@ namespace ghex { namespace device { +struct cuda_event { + cudaEvent_t m_event; + ghex::util::moved_bit m_moved; + + cuda_event() { + GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming)) + } + cuda_event(const cuda_event&) = delete; + cuda_event& operator=(const cuda_event&) = delete; + cuda_event(cuda_event&& other) = default; + cuda_event& operator=(cuda_event&&) = default; + + ~cuda_event() + { + if (!m_moved) + { + GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event)) + } + } + + operator bool() const noexcept { return m_moved; } + operator cudaEvent_t() const noexcept { return m_event; } + cudaEvent_t& get() noexcept { return m_event; } + const cudaEvent_t& get() const noexcept { return m_event; } +}; + /** @brief thin wrapper around a cuda stream */ struct stream { cudaStream_t m_stream; - cudaEvent_t m_event; ghex::util::moved_bit m_moved; stream() { GHEX_CHECK_CUDA_RESULT(cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking)) - GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming)) } stream(const stream&) = delete; @@ -42,7 +66,6 @@ struct stream if (!m_moved) { GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaStreamDestroy(m_stream)) - GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event)) } } @@ -55,9 +78,8 @@ struct stream void sync() { - GHEX_CHECK_CUDA_RESULT(cudaEventRecord(m_event, m_stream)) // busy wait here - GHEX_CHECK_CUDA_RESULT(cudaEventSynchronize(m_event)) + GHEX_CHECK_CUDA_RESULT(cudaStreamSynchronize(m_stream)) } }; } // namespace device From 6d896166994cedbcfc50da1873239a5edb212e3f Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Mon, 3 Nov 2025 12:57:29 +0100 Subject: [PATCH 5/5] Add dependency on default stream before starting packing --- include/ghex/packer.hpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/include/ghex/packer.hpp b/include/ghex/packer.hpp index a1475ad7..56903a84 100644 --- a/include/ghex/packer.hpp +++ b/include/ghex/packer.hpp @@ -123,6 +123,10 @@ struct packer using future_type = device::future; std::size_t num_streams = 0; + constexpr std::size_t num_events{128}; + static std::vector events(num_events); + static std::size_t event_index{0}; + for (auto& p0 : map.send_memory) { const auto device_id = p0.first; @@ -141,12 +145,24 @@ struct packer std::vector stream_futures; stream_futures.reserve(num_streams); num_streams = 0; + + // Assume that send memory synchronizes with the default + // stream so schedule pack kernels after an event on the + // default stream. + cudaEvent_t& e = events[event_index].get(); + event_index = (event_index + 1) % num_events; + GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, 0)); + for (auto& p0 : map.send_memory) { for (auto& p1 : p0.second) { if (p1.second.size > 0u) { + // Make sure stream used for packing synchronizes with the + // default stream. + GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(p1.second.m_stream.get(), e)); + for (const auto& fb : p1.second.field_infos) { device::guard g(p1.second.buffer);