Skip to content

Commit

Permalink
StaggeredAllGather introduced as MPI pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
kubagalecki committed Jan 20, 2024
1 parent 0b79105 commit f6eabc6
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 168 deletions.
30 changes: 18 additions & 12 deletions include/l3ster/comm/MpiComm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ class MpiComm
[[nodiscard]] int getError() const { return m_status.MPI_ERROR; }

private:
auto getHandle() -> MPI_Status* { return &m_status; }

MPI_Status m_status;
};
static_assert(std::is_standard_layout_v< Status >);
Expand All @@ -128,6 +126,7 @@ class MpiComm
public:
friend class MpiComm;

Request() = default;
Request(const Request&) = delete;
Request& operator=(const Request&) = delete;
inline Request(Request&&) noexcept;
Expand All @@ -154,11 +153,8 @@ class MpiComm
requires std::same_as< std::ranges::range_value_t< RequestRange >, Request >;

private:
Request() = default;
int waitImpl() noexcept { return MPI_Wait(&m_request, MPI_STATUS_IGNORE); }

auto getHandle() -> MPI_Request* { return &m_request; }

MPI_Request m_request = MPI_REQUEST_NULL;
};
static_assert(std::is_standard_layout_v< Request >);
Expand Down Expand Up @@ -219,6 +215,8 @@ class MpiComm
void allReduce(Data&& data, It out_it, MPI_Op op) const;
template < comm::MpiBuf_c Data, comm::MpiOutputIterator_c< Data > It >
void gather(Data&& data, It out_it, int root) const;
template < comm::MpiBuf_c Data, comm::MpiOutputIterator_c< Data > It >
void allGather(Data&& data, It out_it) const;
template < comm::MpiBuf_c Data >
void broadcast(Data&& data, int root) const;

Expand Down Expand Up @@ -352,7 +350,7 @@ auto MpiComm::FileHandle::readAtAsync(Data&& read_range, MPI_Offset offset) cons
{
const auto [datatype, buf_begin, buf_size] = comm::parseMpiBuf(read_range);
MpiComm::Request request;
L3STER_INVOKE_MPI(MPI_File_iread_at, m_file, offset, buf_begin, buf_size, datatype, request.getHandle());
L3STER_INVOKE_MPI(MPI_File_iread_at, m_file, offset, buf_begin, buf_size, datatype, &request.m_request);
return request;
}

Expand All @@ -367,7 +365,7 @@ auto MpiComm::FileHandle::writeAtAsync(Data&& write_range, MPI_Offset offset) co
std::ranges::data(write_range),
util::exactIntegerCast< int >(std::ranges::size(write_range)),
datatype,
request.getHandle());
&request.m_request);
return request;
}

Expand Down Expand Up @@ -422,7 +420,7 @@ auto MpiComm::probeAsync(int source, int tag) const -> std::pair< Status, bool >
{
auto retval = std::pair< Status, bool >{};
int flag{};
L3STER_INVOKE_MPI(MPI_Iprobe, source, tag, m_comm, &flag, retval.first.getHandle());
L3STER_INVOKE_MPI(MPI_Iprobe, source, tag, m_comm, &flag, &retval.first.m_status);
retval.second = flag;
return retval;
}
Expand Down Expand Up @@ -453,8 +451,16 @@ template < comm::MpiBuf_c Data, comm::MpiOutputIterator_c< Data > It >
void MpiComm::gather(Data&& data, It out_it, int root) const
{
const auto [datatype, buf_begin, buf_size] = comm::parseMpiBuf(data);
L3STER_INVOKE_MPI(
MPI_Gather, buf_begin, buf_size, datatype, std::addressof(*out_it), buf_size, datatype, root, m_comm);
const auto out_ptr = std::addressof(*out_it);
L3STER_INVOKE_MPI(MPI_Gather, buf_begin, buf_size, datatype, out_ptr, buf_size, datatype, root, m_comm);
}

template < comm::MpiBuf_c Data, comm::MpiOutputIterator_c< Data > It >
void MpiComm::allGather(Data&& data, It out_it) const
{
const auto [datatype, buf_begin, buf_size] = comm::parseMpiBuf(data);
const auto out_ptr = std::addressof(*out_it);
L3STER_INVOKE_MPI(MPI_Allgather, buf_begin, buf_size, datatype, out_ptr, buf_size, datatype, m_comm);
}

template < comm::MpiBuf_c Data >
Expand All @@ -469,7 +475,7 @@ auto MpiComm::broadcastAsync(Data&& data, int root) const -> MpiComm::Request
{
const auto [datatype, buf_begin, buf_size] = comm::parseMpiBuf(data);
auto request = Request{};
L3STER_INVOKE_MPI(MPI_Ibcast, buf_begin, buf_size, datatype, root, m_comm, request.getHandle());
L3STER_INVOKE_MPI(MPI_Ibcast, buf_begin, buf_size, datatype, root, m_comm, &request.m_request);
return request;
}

Expand All @@ -487,7 +493,7 @@ auto MpiComm::allToAllAsync(SendBuf&& send_buf, RecvBuf&& recv_buf) const -> Mpi
const int n_elems = send_size / getSize();
auto request = Request{};
L3STER_INVOKE_MPI(
MPI_Ialltoall, send_begin, n_elems, send_type, recv_begin, n_elems, recv_type, m_comm, request.getHandle());
MPI_Ialltoall, send_begin, n_elems, send_type, recv_begin, n_elems, recv_type, m_comm, &request.m_request);
return request;
}

Expand Down
55 changes: 12 additions & 43 deletions include/l3ster/dofs/DofIntervals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define L3STER_DOFS_DOFINTERVALS_HPP

#include "l3ster/dofs/NodeCondensation.hpp"
#include "l3ster/util/Algorithm.hpp"
#include "l3ster/util/BitsetManip.hpp"
#include "l3ster/util/Caliper.hpp"

Expand Down Expand Up @@ -82,12 +83,11 @@ void serializeDofIntervals(const node_interval_vector_t< n_fields >& inter
}

template < size_t n_fields >
void deserializeDofIntervals(const std::ranges::sized_range auto& serial_data, auto out_it)
requires std::same_as< std::ranges::range_value_t< std::decay_t< decltype(serial_data) > >, unsigned long long > and
std::output_iterator< decltype(out_it), node_interval_t< n_fields > >
void deserializeDofIntervals(std::span< const unsigned long long > serial_data,
std::output_iterator< node_interval_t< n_fields > > auto out_it)
{
constexpr auto n_ulls = util::bitsetNUllongs< n_fields >();
for (auto data_it = std::ranges::begin(serial_data); data_it != std::ranges::end(serial_data);)
for (auto data_it = serial_data.begin(); data_it != serial_data.end();)
{
auto delims = std::array< n_id_t, 2 >{};
auto serial_fieldcov = std::array< unsigned long long, n_ulls >{};
Expand All @@ -101,53 +101,22 @@ template < size_t n_fields >
auto gatherGlobalDofIntervals(const MpiComm& comm, const node_interval_vector_t< n_fields >& local_intervals)
-> node_interval_vector_t< n_fields >
{
constexpr size_t serial_interval_size = util::bitsetNUllongs< n_fields >() + 2;
const size_t n_intervals_local = local_intervals.size();
const auto comm_size = comm.getSize();
const auto my_rank = comm.getRank();

const size_t max_n_intervals_global = std::invoke([&] {
size_t retval{};
comm.allReduce(std::views::single(n_intervals_local), &retval, MPI_MAX);
return retval;
});
const size_t max_msg_size = max_n_intervals_global * serial_interval_size + 1u;
auto serial_local_intervals = std::invoke([&] {
auto retval = util::ArrayOwner< unsigned long long >(max_msg_size);
retval.front() = n_intervals_local;
serializeDofIntervals(local_intervals, std::next(retval.begin()));
constexpr size_t serial_interval_size = util::bitsetNUllongs< n_fields >() + 2;
const auto my_rank = comm.getRank();
const auto serialized_local_intervals = std::invoke([&] {
auto retval = util::ArrayOwner< unsigned long long >(serial_interval_size * local_intervals.size());
serializeDofIntervals(local_intervals, retval.begin());
return retval;
});

node_interval_vector_t< n_fields > retval;
retval.reserve(comm_size * max_n_intervals_global);
auto proc_buf = util::ArrayOwner< unsigned long long >(max_msg_size);
const auto process_data = [&](int sender_rank) {
retval.reserve(comm.getSize() * local_intervals.size());
const auto process_received = [&](std::span< const unsigned long long > received_intervals, int sender_rank) {
if (sender_rank != my_rank)
{
const auto received_intervals =
proc_buf | std::views::drop(1) | std::views::take(proc_buf.front() * serial_interval_size);
deserializeDofIntervals< n_fields >(received_intervals, std::back_inserter(retval));
}
else
std::ranges::copy(local_intervals, std::back_inserter(retval));
};

auto msg_buf =
my_rank == 0 ? std::move(serial_local_intervals) : util::ArrayOwner< unsigned long long >(max_msg_size);
auto request = comm.broadcastAsync(msg_buf, 0);
for (int root_rank = 1; root_rank < comm_size; ++root_rank)
{
request.wait();
std::swap(msg_buf, proc_buf);
if (my_rank == root_rank)
msg_buf = std::move(serial_local_intervals);
request = comm.broadcastAsync(msg_buf, root_rank);
process_data(root_rank - 1);
}
request.wait();
std::swap(msg_buf, proc_buf);
process_data(comm_size - 1);
util::staggeredAllGather(comm, std::span{serialized_local_intervals}, process_received);
return retval;
}

Expand Down
98 changes: 20 additions & 78 deletions include/l3ster/dofs/NodeCondensation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,77 +81,36 @@ auto getActiveNodes(const mesh::MeshPartition< orders... >& mesh,
return retval;
}

template < std::ranges::range Range >
auto packRangeWithSizeForComm(Range&& range, size_t alloc_size)
-> util::ArrayOwner< std::ranges::range_value_t< Range > >
requires lstr::comm::MpiBuiltinType_c< std::ranges::range_value_t< Range > > and
std::integral< std::ranges::range_value_t< Range > >
{
using range_value_t = std::ranges::range_value_t< decltype(range) >;
const size_t range_size = std::ranges::distance(range);
const auto range_size_to_pack = util::exactIntegerCast< range_value_t >(range_size);
auto retval = util::ArrayOwner< range_value_t >(alloc_size);
retval.front() = range_size_to_pack;
std::ranges::copy(std::forward< decltype(range) >(range), std::next(retval.begin()));
return retval;
}

template < std::integral T >
auto unpackRangeFromComm(util::ArrayOwner< T >& message)
{
return message | std::views::drop(1) | std::views::take(static_cast< size_t >(message.front()));
}

template < el_o_t... orders >
auto computeCondensedActiveNodeIds(const MpiComm& comm,
const mesh::MeshPartition< orders... >& mesh,
const std::vector< n_id_t >& uncondensed_active_nodes) -> std::vector< n_id_t >
{
const int my_rank = comm.getRank();
const int comm_size = comm.getSize();

// The condensation algorithm requires that each uncondensed active node be owned by exactly one partition. A node
// may be inactive on the partition which owns it. Such nodes must therefore first be claimed by a different
// partition.
const auto owned_active_nodes = std::invoke([&] {
const auto owned_active_nodes = std::invoke([&] {
auto retval = std::vector< n_id_t >{};
retval.reserve(uncondensed_active_nodes.size());
std::ranges::copy_if(uncondensed_active_nodes, std::back_inserter(retval), mesh.getOwnedNodePredicate());
retval.shrink_to_fit();
return retval;
});
const auto max_msg_size = std::invoke([&] {
size_t retval{};
comm.allReduce(std::views::single(owned_active_nodes.size() + 1u), &retval, MPI_MAX);
return retval;
});
auto owned_active_nodes_msg = packRangeWithSizeForComm(owned_active_nodes, max_msg_size);

std::vector< n_id_t > retval(uncondensed_active_nodes.size());
if (comm_size == 1)
if (comm.getSize() == 1)
{
std::iota(retval.begin(), retval.end(), 0ul);
return retval;
}
util::ArrayOwner< n_id_t > proc_buf(max_msg_size), comm_buf(max_msg_size);
const auto process_received_active_ids = [&](int sender_rank) {
const auto off_rank_nodes = unpackRangeFromComm(sender_rank == my_rank ? owned_active_nodes_msg : proc_buf);
for (size_t i = 0; auto& cond_id : retval)
cond_id += std::distance(off_rank_nodes.begin(),
std::ranges::lower_bound(off_rank_nodes, uncondensed_active_nodes[i++]));
const auto process_received_active_ids = [&](std::span< const n_id_t > off_rank_nodes, int) {
for (size_t i = 0; i != retval.size(); ++i)
{
const auto iter = std::ranges::lower_bound(off_rank_nodes, uncondensed_active_nodes[i]);
retval[i] += std::distance(off_rank_nodes.begin(), iter);
}
};

auto request = comm.broadcastAsync(my_rank == 0 ? owned_active_nodes_msg : comm_buf, 0);
for (int sender_rank = 1; sender_rank < comm_size; ++sender_rank)
{
request.wait();
std::swap(proc_buf, comm_buf);
request = comm.broadcastAsync(my_rank == sender_rank ? owned_active_nodes_msg : comm_buf, sender_rank);
process_received_active_ids(sender_rank - 1);
}
request.wait();
std::swap(proc_buf, comm_buf);
process_received_active_ids(comm_size - 1);
util::staggeredAllGather(comm, std::span{owned_active_nodes}, process_received_active_ids);
return retval;
}

Expand All @@ -160,37 +119,20 @@ void activateOwned(const MpiComm& comm,
const mesh::MeshPartition< orders... >& mesh,
std::vector< n_id_t >& my_active_nodes)
{
const int my_rank = comm.getRank();
const int comm_size = comm.getSize();
auto ghost_nodes = std::vector< n_id_t >{};
ghost_nodes.reserve(my_active_nodes.size());
for (auto n : my_active_nodes | std::views::filter(mesh.getGhostNodePredicate()))
ghost_nodes.push_back(n);
const auto max_ghost_nodes = std::invoke([&] {
size_t retval{};
comm.allReduce(std::views::single(ghost_nodes.size()), &retval, MPI_MAX);
const int my_rank = comm.getRank();
const auto my_active_ghost_nodes = std::invoke([&] {
auto retval = std::vector< n_id_t >{};
retval.reserve(my_active_nodes.size());
std::ranges::copy_if(my_active_nodes, std::back_inserter(retval), mesh.getGhostNodePredicate());
return retval;
});
const auto process_offrank_active_nodes = [&](util::ArrayOwner< n_id_t >& active_nodes) {
for (auto active : unpackRangeFromComm(active_nodes))
if (mesh.isOwnedNode(active) and not std::ranges::binary_search(my_active_nodes, active))
my_active_nodes.insert(std::ranges::lower_bound(my_active_nodes, active), active);
const auto process_offrank_active_nodes = [&](std::span< const n_id_t > active_nodes, int send_rank) {
if (send_rank != my_rank)
for (auto active : active_nodes)
if (mesh.isOwnedNode(active) and not std::ranges::binary_search(my_active_nodes, active))
my_active_nodes.insert(std::ranges::lower_bound(my_active_nodes, active), active);
};
auto my_active_ghost_nodes = packRangeWithSizeForComm(ghost_nodes, max_ghost_nodes + 1);
auto comm_buf = util::ArrayOwner< n_id_t >(max_ghost_nodes + 1),
proc_buf = util::ArrayOwner< n_id_t >(max_ghost_nodes + 1);
auto request = comm.broadcastAsync(my_rank == 0 ? my_active_ghost_nodes : comm_buf, 0);
for (int send_rank = 1; send_rank != comm_size; ++send_rank)
{
request.wait();
std::swap(proc_buf, comm_buf);
request = comm.broadcastAsync(my_rank == send_rank ? my_active_ghost_nodes : comm_buf, send_rank);
if (my_rank != send_rank - 1)
process_offrank_active_nodes(proc_buf);
}
request.wait();
if (my_rank != comm_size - 1)
process_offrank_active_nodes(comm_buf);
util::staggeredAllGather(comm, std::span{my_active_ghost_nodes}, process_offrank_active_nodes);
my_active_nodes.shrink_to_fit();
}

Expand Down
38 changes: 8 additions & 30 deletions include/l3ster/glob_asm/SparsityGraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "l3ster/dofs/DofsFromNodes.hpp"
#include "l3ster/dofs/MakeTpetraMap.hpp"
#include "l3ster/util/Algorithm.hpp"
#include "l3ster/util/Caliper.hpp"
#include "l3ster/util/CrsGraph.hpp"
#include "l3ster/util/DynamicBitset.hpp"
Expand All @@ -24,43 +25,20 @@ template < el_o_t... orders >
auto computeInNeighborGhostNodes(const MpiComm& comm, const mesh::MeshPartition< orders... >& mesh)
-> rank_to_nodes_map_t
{
const auto max_n_ghosts = std::invoke([&] {
const size_t my_n_ghosts = mesh.getGhostNodes().size();
size_t retval{};
comm.allReduce(std::span{&my_n_ghosts, 1}, &retval, MPI_MAX);
return retval;
});
util::ArrayOwner< n_id_t > send_buf(max_n_ghosts + 1), recv_buf(max_n_ghosts + 1), proc_buf(max_n_ghosts + 1);
send_buf.front() = mesh.getGhostNodes().size();
std::ranges::copy(mesh.getGhostNodes(), std::next(send_buf.begin()));

auto retval = rank_to_nodes_map_t{};
const auto process_recvd = [&](const util::ArrayOwner< n_id_t >& data_buf, int potential_nbr_rank) {
const auto rank_ghosts = std::span{std::next(data_buf.begin()), data_buf.front()};
const int my_rank = comm.getRank();
auto retval = rank_to_nodes_map_t{};
const auto process_nbr_ghosts = [&](std::span< const n_id_t > rank_ghosts, int potential_nbr_rank) {
if (potential_nbr_rank == my_rank)
return;
std::vector< n_id_t >* nbr_ghosts_ptr = nullptr; // avoid repeated lookup
for (n_id_t ghost : rank_ghosts | std::views::filter(mesh.getOwnedNodePredicate()))
for (auto ghost : rank_ghosts | std::views::filter(mesh.getOwnedNodePredicate()))
{
if (not nbr_ghosts_ptr)
nbr_ghosts_ptr = std::addressof(retval[potential_nbr_rank]);
nbr_ghosts_ptr->push_back(ghost);
}
};

// Staggered all-gather pattern
const int my_rank = comm.getRank(), comm_size = comm.getSize();
auto request = comm.broadcastAsync(my_rank == 0 ? send_buf : recv_buf, 0);
for (int send_rank = 1; send_rank != comm_size; ++send_rank)
{
request.wait();
std::swap(recv_buf, proc_buf);
request = comm.broadcastAsync(my_rank == send_rank ? send_buf : recv_buf, send_rank);
if (my_rank != send_rank - 1)
process_recvd(proc_buf, send_rank - 1);
}
request.wait();
if (my_rank != comm_size - 1)
process_recvd(recv_buf, comm_size - 1);

util::staggeredAllGather(comm, mesh.getGhostNodes(), process_nbr_ghosts);
return retval;
}

Expand Down
Loading

0 comments on commit f6eabc6

Please sign in to comment.