Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pika's transform_mpi and polling support #1125

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft

Conversation

biddisco
Copy link
Collaborator

@biddisco biddisco commented Apr 29, 2024

Fixes #536.

This is a squashed commit containing multiple changes

completion_modes:
pika supports different completion modes that may be used as an alternative to the dlaf:: transformMPI mechanism that uses yield_while to wait on an MPI request.

The completion modes may be set via the environment variable PIKA_MPI_COMPLETION_MODE=
which by default will select the one chosen by pika/dlaf developers known to give good results across a broad range of use cases.

polling:
The pika polling loop may test for one or multiple request completions on each iteration through the scheduling loop the environment var
PIKA_MPI_POLLING_SIZE= (default 8)
may be used to vary the polling size (typically the default value can be used without any need to play with this value)

mpi pool: pika will create the mpi pool if the completion mode has the pool flag set, the user needs only to call the pool create function during the pika::init setup phase.
Cleanup of the pool on shutdown will also be handled automatically

The user should use pika::mpi::pool_name instead of raw "mpi", mpi pool management has been deferred tom pika::mpi

Change: the transform mpi code does not need to return an MPI_SUCCESS value, the return value from mpi_transform has been removed to simplify code and an error is set using senders set_error if any mpi call fails. Should mpi_transform calls thnat return other value be required, this code can be reinstated.

@biddisco
Copy link
Collaborator Author

Note that this PR depends on the mpi_polling pr in pika so probably won't work with pika master

@msimberg
Copy link
Collaborator

I've just merged pika-org/pika#1102 so the changes are on pika main now.

Comment on lines 14 to 15
#ifdef EXTRA_MPI_TYPES_DEBUGGING
#include <pika/debugging/demangle_helper.hpp>
#endif
#include <pika/debugging/print.hpp>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that these can be useful for debugging, but I would prefer if we leave these out at this point, especially since you're planning to change the debugging/logging facilities in pika sometime soon.

Comment on lines +26 to +25
//
#include <pika/mpi.hpp>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this up to after pika/execution.hpp.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//
#include <pika/mpi.hpp>

(internal::consumeCommunicatorWrapper(ts), ...);
pika::util::yield_while([req]() { return !mpid::poll_request(req); });
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can keep this on this branch until we've verified that going through pika's transform_mpi is equivalent or better, but then I'd remove this? If we can avoid it we should not use things from pika::mpi::detail.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this (the old yield-while version) completely now that we know pika's polling works fine.

Comment on lines 123 to 141
if (mpi::get_completion_mode() >= static_cast<int>(mpid::handler_mode::unspecified)) {
auto snd1 =
ex::transfer(std::forward<Sender>(sender), dlaf::internal::getMPIScheduler()) |
ex::then(dlaf::common::internal::ConsumeRvalues{MPIYieldWhileCallHelper{std::forward<F>(f)}});
return ex::make_unique_any_sender(std::move(snd1));
}
else {
#ifdef EXTRA_MPI_TYPES_DEBUGGING
auto snd1 =
std::forward<Sender>(sender) |
ex::let_value([=, f = std::move(f)]<typename... LArgs>(LArgs&&... largs) {
PIKA_DETAIL_DP(dla_debug<2>, debug(str<>("Args to MPI fn\n"),
pika::debug::print_type<LArgs...>(", "), "\nValues\n"));
return ex::just(std::move(largs)...) |
mpi::transform_mpi(dlaf::common::internal::ConsumeRvalues{MPICallHelper{std::move(f)}});
});
return ex::make_unique_any_sender(std::move(snd1));
#else
PIKA_DETAIL_DP(dla_debug<5>, debug(str<>("MPI fn\n")));
auto snd1 =
std::forward<Sender>(sender) |
mpi::transform_mpi(dlaf::common::internal::ConsumeRvalues{MPICallHelper{std::forward<F>(f)}});
return ex::make_unique_any_sender(std::move(snd1));
#endif
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here: let's verify that pika's transform_mpi is as good or better than DLAF's yield_while and then get rid of DLAF's version? It'd also avoid an unnecessary type-erasure.

Comment on lines -150 to -173
/// A partially applied transformMPIDetach, with the callable object given, but
/// the predecessor sender missing. The predecessor sender is applied when
/// calling the operator| overload.
template <typename F>
class PartialTransformMPIDetach : private PartialTransformMPIBase<F> {
public:
template <typename F_>
PartialTransformMPIDetach(F_&& f) : PartialTransformMPIBase<F>{std::forward<F_>(f)} {}
PartialTransformMPIDetach(PartialTransformMPIDetach&&) = default;
PartialTransformMPIDetach(const PartialTransformMPIDetach&) = default;
PartialTransformMPIDetach& operator=(PartialTransformMPIDetach&&) = default;
PartialTransformMPIDetach& operator=(const PartialTransformMPIDetach&) = default;

template <typename Sender>
friend auto operator|(Sender&& sender, PartialTransformMPIDetach pa) {
return pika::execution::experimental::start_detached(transformMPI(std::move(pa.f_),
std::forward<Sender>(sender)));
}
};

template <typename F>
PartialTransformMPIDetach(F&& f) -> PartialTransformMPIDetach<std::decay_t<F>>;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove these in a separate PR if they're unused. I can do so.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/init.cpp Outdated
Comment on lines 102 to 116
// setup polling on default pool, enable exceptions and init mpi internals
pika::mpi::experimental::init(false, true);
pika::mpi::experimental::register_polling();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessary. The CPU backend is always initialized and takes care of registering for polling.

src/init.cpp Outdated
Comment on lines 108 to 111
pika::cuda::experimental::detail::register_polling(
pika::resource::get_thread_pool(pika::cuda::experimental::get_pool_name()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this change from here and deal with CUDA separately?

Comment on lines +201 to +261
// cfg.mpi_pool = (pika::resource::pool_exists("mpi")) ? "mpi" : "default";

// Warn if not using MPI pool without --dlaf:no-mpi-pool
int mpi_initialized;
DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized));
if (mpi_initialized) {
int ntasks;
DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks));
if (ntasks != 1 && cfg.mpi_pool == "default" && !vm["dlaf:no-mpi-pool"].as<bool>()) {
std::cerr << "Warning! DLA-Future is not using the \"mpi\" pika thread pool for "
"MPI communication but --dlaf:no-mpi-pool is not set. This may "
"indicate a bug in DLA-Future or pika. Performance may be degraded."
<< std::endl;
}
}
// int mpi_initialized;
// DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized));
// if (mpi_initialized) {
// int ntasks;
// DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks));
// if (ntasks != 1 && cfg.mpi_pool == "default" && !vm["dlaf:no-mpi-pool"].as<bool>()) {
// std::cerr << "Warning! DLA-Future is not using the \"mpi\" pika thread pool for "
// "MPI communication but --dlaf:no-mpi-pool is not set. This may "
// "indicate a bug in DLA-Future or pika. Performance may be degraded."
// << std::endl;
// }
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we can now sufficiently control the MPI pool creation through pika, you can remove these options completely.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure which options are available to initialize MPI pool in pika.
But I will keep the warning in the case non optimal options were chosen.

src/init.cpp Outdated
Comment on lines 377 to 402
void initResourcePartitionerHandler(pika::resource::partitioner&,
const pika::program_options::variables_map& vm) {
// Don't create the MPI pool if the user disabled it
namespace mpi = pika::mpi::experimental;
// Create the MPI pool if needed and unless the user disabled it
mpi::pool_create_mode pool_mode = mpi::pool_create_mode::pika_decides;
namespace mpi = pika::mpi::experimental;
if (vm["dlaf:no-mpi-pool"].as<bool>())
return;

// Don't create the MPI pool if there is a single process
int ntasks;
DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks));
if (ntasks == 1)
return;

// Disable idle backoff on the MPI pool
using pika::threads::scheduler_mode;
auto mode = scheduler_mode::default_mode;
mode = scheduler_mode(mode & ~scheduler_mode::enable_idle_backoff);

// Create a thread pool with a single core that we will use for all
// communication related tasks
rp.create_thread_pool("mpi", pika::resource::scheduling_policy::static_priority, mode);
rp.add_resource(rp.numa_domains()[0].cores()[0].pus()[0], "mpi");
pool_mode = mpi::pool_create_mode::force_no_create;

namespace mpix = pika::mpi::experimental;
// create a pool for mpi if necessary
mpix::create_pool(mpix::get_pool_name(), pool_mode);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving this logic completely to pika and not initializing the MPI pool through the resource partitioner callback? In the long term I think it makes little sense to have DLA-Future be responsible for creating the MPI pool if the rest of the MPI functionality is in pika as well. The difficulty is: how does pika know if the MPI pool is needed or not (in the generic case, i.e. if it's not running for DLA-Future)... In the ideal case doing the polling on the default pool would be as performant as a separate pool, then we wouldn't need to bother with this. Perhaps worth coming back to this once we know a bit more about performance on Grace-Hopper?

DLAF_MPI_CHECK_ERROR(e1);
DLAF_MPI_CHECK_ERROR(e2);
}));
sync_wait(when_all(std::move(send), std::move(recv)) | then([]() {}));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're really planning on leaving out the return value of the function passed to transform_mpi, you can remove the then from here (same further down). It serves no purpose.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sync_wait(when_all(std::move(send), std::move(recv)) | then([]() {}));
sync_wait(when_all(std::move(send), std::move(recv)));

Comment on lines 97 to 106
if constexpr (std::is_void_v<result_type>) {
std::move(f)(dlaf::common::internal::unwrap(ts)...);
(internal::consumeCommunicatorWrapper(ts), ...);
}
else {
auto r = std::move(f)(dlaf::common::internal::unwrap(ts)...);
(internal::consumeCommunicatorWrapper(ts), ...);
pika::util::yield_while(is_request_completed);
return r;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we had a scope guard type of thing (https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/n4853.html#scopeguard.exit) in DLA-Future we could write this roughly as:

      scope_exit s{[&] { (internal::consumeCommunicatorWrapper(ts), ...); });  
      return std::move(f)(dlaf::common::internal::unwrap(ts)...);

without the need to branch for void and non-void cases.

I'm not expecting a change here, but I just wanted to write this down here as a reminder for when we get closer to merging this PR.

@msimberg msimberg marked this pull request as draft October 14, 2024 12:56
@msimberg msimberg changed the title Use pika's transform_mpi, polling and stream throttling support Use pika's transform_mpi and polling support Oct 14, 2024
This is a squashed commit containing multiple changes

completion_modes:
pika supports different completion modes that may be used
as an alternative to the dlaf:: transformMPI mechanism
that uses yield_while to wait on an MPI request.

The completion modes may be set via the environment variable
PIKA_MPI_COMPLETION_MODE=<numeric value>
which by default will select the one chosen by pika/dlaf
developers known to give good results across a broad range of
use cases.

polling:
The pika polling loop may test for one or multiple request
completions on each iteration through the scheduling loop
the environment var
PIKA_MPI_POLLING_SIZE=<numeric value> (default 8)
may be used to vary the polling size (typically the default value
can be used without any need to play with this value)

mpi pool: pika will create the mpi pool if the completion mode
has the pool flag set, the user needs only to call the pool create
function during the pika::init setup phase.
Cleanup of the pool on shutdown will also be handled automatically

The user should use pika::mpi::pool_name instead of raw "mpi",
mpi pool management has been deferred tom pika::mpi

Change: the transform mpi code does not need to return an MPI_SUCCESS
value, the return value from mpi_transform has been removed to simplify
code and an error is set using senders set_error if any mpi call fails.
Should mpi_transform calls thnat return other value be required, this code
can be reinstated.
A series of changes to pika::mpi have changed both the API and the
internal pool creation mechanism to simplify end user access to the
transform_mpi features and setup of mpi polling itself.
latest pika:: transform_mpi is enabled/disabled via the command line
and does not require explicit enabling via init_params
@@ -12,6 +12,7 @@
#include <type_traits>
#include <utility>

#include <pika/debugging/print.hpp>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include <pika/debugging/print.hpp>
#include <pika/debugging/print.hpp>

Comment on lines +26 to +25
//
#include <pika/mpi.hpp>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//
#include <pika/mpi.hpp>

@@ -12,6 +12,7 @@
#include <type_traits>
#include <utility>

#include <pika/debugging/print.hpp>
#include <pika/execution.hpp>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include <pika/execution.hpp>
#include <pika/execution.hpp>
#include <pika/mpi.hpp>

Comment on lines +26 to +29
//
#ifdef EXTRA_MPI_TYPES_DEBUGGING
#include <pika/debugging/demangle_helper.hpp>
#endif
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//
#ifdef EXTRA_MPI_TYPES_DEBUGGING
#include <pika/debugging/demangle_helper.hpp>
#endif

Comment on lines +33 to +35
template <int Level>
static pika::debug::detail::print_threshold<Level, 0> dla_debug("DLA_MPI");

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
template <int Level>
static pika::debug::detail::print_threshold<Level, 0> dla_debug("DLA_MPI");

Comment on lines +63 to +65
if (pika::mpi::detail::environment::is_mpi_initialized()) {
pika::mpi::experimental::start_polling(pika::mpi::experimental::exception_mode::no_handler);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I'd have these removed (see pika-org/pika#1267 (comment)). If they stay I think this and the stop_polling should have the conditional removed:

Suggested change
if (pika::mpi::detail::environment::is_mpi_initialized()) {
pika::mpi::experimental::start_polling(pika::mpi::experimental::exception_mode::no_handler);
}
pika::mpi::experimental::start_polling(pika::mpi::experimental::exception_mode::no_handler);

There should be no situation where MPI isn't already initialized at this point (if anything we can add an assertion, but then it shouln't be calling is_mpi_initialized from pika's detail namespace, but something from DLA-Future or MPI_Initialized directly).

@@ -230,21 +244,21 @@ void updateConfiguration(const pika::program_options::variables_map& vm, configu
updateConfigurationValue(vm, cfg.umpire_device_memory_pool_initial_bytes,
"UMPIRE_DEVICE_MEMORY_POOL_INITIAL_BYTES",
"umpire-device-memory-pool-initial-bytes");
cfg.mpi_pool = (pika::resource::pool_exists("mpi")) ? "mpi" : "default";
// cfg.mpi_pool = (pika::resource::pool_exists("mpi")) ? "mpi" : "default";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// cfg.mpi_pool = (pika::resource::pool_exists("mpi")) ? "mpi" : "default";

Comment on lines +250 to +261
// int mpi_initialized;
// DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized));
// if (mpi_initialized) {
// int ntasks;
// DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks));
// if (ntasks != 1 && cfg.mpi_pool == "default" && !vm["dlaf:no-mpi-pool"].as<bool>()) {
// std::cerr << "Warning! DLA-Future is not using the \"mpi\" pika thread pool for "
// "MPI communication but --dlaf:no-mpi-pool is not set. This may "
// "indicate a bug in DLA-Future or pika. Performance may be degraded."
// << std::endl;
// }
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// int mpi_initialized;
// DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized));
// if (mpi_initialized) {
// int ntasks;
// DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks));
// if (ntasks != 1 && cfg.mpi_pool == "default" && !vm["dlaf:no-mpi-pool"].as<bool>()) {
// std::cerr << "Warning! DLA-Future is not using the \"mpi\" pika thread pool for "
// "MPI communication but --dlaf:no-mpi-pool is not set. This may "
// "indicate a bug in DLA-Future or pika. Performance may be degraded."
// << std::endl;
// }
// }

DLAF_MPI_CHECK_ERROR(e1);
DLAF_MPI_CHECK_ERROR(e2);
}));
sync_wait(when_all(std::move(send), std::move(recv)) | then([]() {}));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sync_wait(when_all(std::move(send), std::move(recv)) | then([]() {}));
sync_wait(when_all(std::move(send), std::move(recv)));

@@ -65,8 +62,7 @@ TEST(Bcast, Polling) {
double val = (comm.rank() == root_rank) ? 4.2 : 1.2;
std::vector<double> buf(static_cast<std::size_t>(size), val);

sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast) |
then([](int e) { DLAF_MPI_CHECK_ERROR(e); }));
sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast) | then([]() {}));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast) | then([]() {}));
sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

Use pika's transform_mpi in DLA-Future's transformMPI
3 participants