Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion include/ghex/communication_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,10 @@ class communication_object
private: // synchronize (unpacking) streams
void sync_streams()
{
constexpr std::size_t num_events{128};
static std::vector<device::cuda_event> events(num_events);
static std::size_t event_index{0};

using gpu_mem_t = buffer_memory<gpu>;
auto& m = std::get<gpu_mem_t>(m_mem);
for (auto& p0 : m.recv_memory)
Expand All @@ -523,7 +527,15 @@ class communication_object
{
if (p1.second.size > 0u)
{
p1.second.m_stream.sync();
// p1.second.m_stream.sync();
// Instead of doing a blocking wait, create events on each
// stream that the default stream waits for. This assumes
// that all kernels that need the unpacked data will use or
// synchronize with the default stream.
cudaEvent_t& e = events[event_index].get();
event_index = (event_index + 1) % num_events;
GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, p1.second.m_stream.get()));
GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(0, e));
}
}
}
Expand Down
32 changes: 27 additions & 5 deletions include/ghex/device/cuda/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,41 @@ namespace ghex
{
namespace device
{
struct cuda_event {
cudaEvent_t m_event;
ghex::util::moved_bit m_moved;

cuda_event() {
GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming))
}
cuda_event(const cuda_event&) = delete;
cuda_event& operator=(const cuda_event&) = delete;
cuda_event(cuda_event&& other) = default;
cuda_event& operator=(cuda_event&&) = default;

~cuda_event()
{
if (!m_moved)
{
GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event))
}
}

operator bool() const noexcept { return m_moved; }
operator cudaEvent_t() const noexcept { return m_event; }
cudaEvent_t& get() noexcept { return m_event; }
const cudaEvent_t& get() const noexcept { return m_event; }
};

/** @brief thin wrapper around a cuda stream */
struct stream
{
cudaStream_t m_stream;
cudaEvent_t m_event;
ghex::util::moved_bit m_moved;

stream()
{
GHEX_CHECK_CUDA_RESULT(cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking))
GHEX_CHECK_CUDA_RESULT(cudaEventCreateWithFlags(&m_event, cudaEventDisableTiming))
}

stream(const stream&) = delete;
Expand All @@ -42,7 +66,6 @@ struct stream
if (!m_moved)
{
GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaStreamDestroy(m_stream))
GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event))
}
}

Expand All @@ -55,9 +78,8 @@ struct stream

void sync()
{
GHEX_CHECK_CUDA_RESULT(cudaEventRecord(m_event, m_stream))
// busy wait here
GHEX_CHECK_CUDA_RESULT(cudaEventSynchronize(m_event))
GHEX_CHECK_CUDA_RESULT(cudaStreamSynchronize(m_stream))
}
};
} // namespace device
Expand Down
16 changes: 16 additions & 0 deletions include/ghex/packer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ struct packer<gpu>
using future_type = device::future<send_buffer_type*>;
std::size_t num_streams = 0;

constexpr std::size_t num_events{128};
static std::vector<device::cuda_event> events(num_events);
static std::size_t event_index{0};

for (auto& p0 : map.send_memory)
{
const auto device_id = p0.first;
Expand All @@ -141,12 +145,24 @@ struct packer<gpu>
std::vector<future_type> stream_futures;
stream_futures.reserve(num_streams);
num_streams = 0;

// Assume that send memory synchronizes with the default
// stream so schedule pack kernels after an event on the
// default stream.
cudaEvent_t& e = events[event_index].get();
event_index = (event_index + 1) % num_events;
GHEX_CHECK_CUDA_RESULT(cudaEventRecord(e, 0));

for (auto& p0 : map.send_memory)
{
for (auto& p1 : p0.second)
{
if (p1.second.size > 0u)
{
// Make sure stream used for packing synchronizes with the
// default stream.
GHEX_CHECK_CUDA_RESULT(cudaStreamWaitEvent(p1.second.m_stream.get(), e));

for (const auto& fb : p1.second.field_infos)
{
device::guard g(p1.second.buffer);
Expand Down
157 changes: 121 additions & 36 deletions include/ghex/unstructured/user_concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,39 +454,71 @@ class data_descriptor<ghex::cpu, DomainId, Idx, T>

#ifdef GHEX_CUDACC

#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK 32
#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X 32
#define GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y 8

template<typename T>
__global__ void
pack_kernel(const T* values, const std::size_t local_indices_size,
pack_kernel_levels_first(const T* values, const std::size_t local_indices_size,
const std::size_t* local_indices, const std::size_t levels, T* buffer,
const std::size_t index_stride, const std::size_t level_stride,
const std::size_t buffer_index_stride, const std::size_t buffer_level_stride)
const std::size_t index_stride, const std::size_t buffer_index_stride)
{
const std::size_t level = threadIdx.x + (blockIdx.x * blockDim.x);
const std::size_t idx = threadIdx.y + (blockIdx.y * blockDim.y);

if (idx < local_indices_size && level < levels)
{
auto const local_index = local_indices[idx];
buffer[idx * buffer_index_stride + level] = values[local_index * index_stride + level];
}
}

template<typename T>
__global__ void
pack_kernel_levels_last(const T* values, const std::size_t local_indices_size,
const std::size_t* local_indices, const std::size_t levels, T* buffer,
const std::size_t level_stride, const std::size_t buffer_level_stride)
{
const std::size_t idx = threadIdx.x + (blockIdx.x * blockDim.x);
if (idx < local_indices_size)
const std::size_t level = threadIdx.y + (blockIdx.y * blockDim.y);

if (idx < local_indices_size && level < levels)
{
for (std::size_t level = 0; level < levels; ++level)
{
buffer[idx * buffer_index_stride + level * buffer_level_stride] = values[local_indices[idx] * index_stride + level * level_stride];
}
auto const local_index = local_indices[idx];
buffer[idx + level * buffer_level_stride] = values[local_index + level * level_stride];
}
}

template<typename T>
__global__ void
unpack_kernel(const T* buffer, const std::size_t local_indices_size,
unpack_kernel_levels_first(const T* buffer, const std::size_t local_indices_size,
const std::size_t* local_indices, const std::size_t levels, T* values,
const std::size_t index_stride, const std::size_t level_stride,
const std::size_t buffer_index_stride, const std::size_t buffer_level_stride)

const std::size_t index_stride, const std::size_t buffer_index_stride)
{
const std::size_t level = threadIdx.x + (blockIdx.x * blockDim.x);
const std::size_t idx = threadIdx.y + (blockIdx.y * blockDim.y);

if (idx < local_indices_size && level < levels)
{
auto const local_index = local_indices[idx];
values[local_index * index_stride + level] = buffer[idx * buffer_index_stride + level];
}
}

template<typename T>
__global__ void
unpack_kernel_levels_last(const T* buffer, const std::size_t local_indices_size,
const std::size_t* local_indices, const std::size_t levels, T* values,
const std::size_t level_stride, const std::size_t buffer_level_stride)
{
const std::size_t idx = threadIdx.x + (blockIdx.x * blockDim.x);
if (idx < local_indices_size)
const std::size_t level = threadIdx.y + (blockIdx.y * blockDim.y);

if (idx < local_indices_size && level < levels)
{
for (std::size_t level = 0; level < levels; ++level)
{
values[local_indices[idx] * index_stride + level * level_stride] = buffer[idx * buffer_index_stride + level * buffer_level_stride];
}
auto const local_index = local_indices[idx];
values[local_index + level * level_stride] = buffer[idx + level * buffer_level_stride];
}
}

Expand Down Expand Up @@ -522,7 +554,8 @@ class data_descriptor<gpu, DomainId, Idx, T>
* @param outer_stride outer dimension's stride measured in number of elements of type T (special value 0: no padding)
* @param device_id device id*/
data_descriptor(const domain_descriptor_type& domain, value_type* field,
std::size_t levels = 1u, bool levels_first = true, std::size_t outer_stride = 0u, device_id_type device_id = arch_traits<arch_type>::current_id())
std::size_t levels = 1u, bool levels_first = true, std::size_t outer_stride = 0u,
device_id_type device_id = arch_traits<arch_type>::current_id())
: m_device_id{device_id}
, m_domain_id{domain.domain_id()}
, m_domain_size{domain.size()}
Expand All @@ -549,34 +582,86 @@ class data_descriptor<gpu, DomainId, Idx, T>
template<typename IndexContainer>
void pack(value_type* buffer, const IndexContainer& c, void* stream_ptr)
{
const dim3 threads_per_block(GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X,
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y);

for (const auto& is : c)
{
const int n_blocks =
static_cast<int>(std::ceil(static_cast<double>(is.local_indices().size()) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK));
const std::size_t buffer_index_stride = m_levels_first ? m_levels : 1u;
const std::size_t buffer_level_stride = m_levels_first ? 1u : is.local_indices().size();
pack_kernel<value_type><<<n_blocks, GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK,
0, *(reinterpret_cast<cudaStream_t*>(stream_ptr))>>>(m_values,
is.local_indices().size(), is.local_indices().data(), m_levels, buffer,
m_index_stride, m_level_stride, buffer_index_stride, buffer_level_stride);
if (m_levels_first)
{
const int blocks_levels = static_cast<int>(
std::ceil(static_cast<double>(m_levels) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X));
const int blocks_indices = static_cast<int>(
std::ceil(static_cast<double>(is.local_indices().size()) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y));

const dim3 blocks(blocks_levels, blocks_indices);

pack_kernel_levels_first<value_type><<<blocks, threads_per_block, 0,
*(reinterpret_cast<cudaStream_t*>(stream_ptr))>>>(m_values,
is.local_indices().size(), is.local_indices().data(), m_levels, buffer,
m_index_stride, m_levels);
}
else
{
const int blocks_indices = static_cast<int>(
std::ceil(static_cast<double>(is.local_indices().size()) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X));
const int blocks_levels = static_cast<int>(
std::ceil(static_cast<double>(m_levels) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y));

const dim3 blocks(blocks_indices, blocks_levels);

pack_kernel_levels_last<value_type><<<blocks, threads_per_block, 0,
*(reinterpret_cast<cudaStream_t*>(stream_ptr))>>>(m_values,
is.local_indices().size(), is.local_indices().data(), m_levels, buffer,
m_level_stride, is.local_indices().size());
}
}
}

template<typename IndexContainer>
void unpack(const value_type* buffer, const IndexContainer& c, void* stream_ptr)
{
const dim3 threads_per_block(GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X,
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y);

for (const auto& is : c)
{
const int n_blocks =
static_cast<int>(std::ceil(static_cast<double>(is.local_indices().size()) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK));
const std::size_t buffer_index_stride = m_levels_first ? m_levels : 1u;
const std::size_t buffer_level_stride = m_levels_first ? 1u : is.local_indices().size();
unpack_kernel<value_type><<<n_blocks, GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK,
0, *(reinterpret_cast<cudaStream_t*>(stream_ptr))>>>(buffer,
is.local_indices().size(), is.local_indices().data(), m_levels, m_values,
m_index_stride, m_level_stride, buffer_index_stride, buffer_level_stride);
if (m_levels_first)
{
const int blocks_levels = static_cast<int>(
std::ceil(static_cast<double>(m_levels) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X));
const int blocks_indices = static_cast<int>(
std::ceil(static_cast<double>(is.local_indices().size()) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y));

const dim3 blocks(blocks_levels, blocks_indices);

unpack_kernel_levels_first<value_type><<<blocks, threads_per_block, 0,
*(reinterpret_cast<cudaStream_t*>(stream_ptr))>>>(buffer,
is.local_indices().size(), is.local_indices().data(), m_levels, m_values,
m_index_stride, m_levels);
}
else
{
const int blocks_indices = static_cast<int>(
std::ceil(static_cast<double>(is.local_indices().size()) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_X));
const int blocks_levels = static_cast<int>(
std::ceil(static_cast<double>(m_levels) /
GHEX_UNSTRUCTURED_SERIALIZATION_THREADS_PER_BLOCK_Y));

const dim3 blocks(blocks_indices, blocks_levels);

unpack_kernel_levels_last<value_type><<<blocks, threads_per_block, 0,
*(reinterpret_cast<cudaStream_t*>(stream_ptr))>>>(buffer,
is.local_indices().size(), is.local_indices().data(), m_levels, m_values,
m_level_stride, is.local_indices().size());
}
}
}
};
Expand Down