Skip to content

Commit

Permalink
Assign GPUs to MPI ranks per node, allowing more flexible MPI configu…
Browse files Browse the repository at this point in the history
…rations

MPI ensembles can use multiple mpi ranks per node, evenly(ish) distributing GPUs across the ranks per shared memory system.
If more MPI ranks are used on a node than GPUs, additional ranks will do nothing and a warning is reported.

I.e. any number of mpi ranks can be launched, but only the sensible amount will be used.

If the user specifies device indices, they will be load balanced, otherwise all visible devices within the node will be balanced.

Only one rank per node sends the device string back for telemetry, others send back an empty string (while the assembleGPUsString method is expecting a message from each rank in the world.

If no valid cuda devices are provided, an exception is raised

Device allocation is implemented in a static method so it can be tested programatically, withotu launching the test N times with different MPI configurations.
  • Loading branch information
ptheywood committed Dec 12, 2023
1 parent 09d72ea commit bcc66be
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 67 deletions.
29 changes: 15 additions & 14 deletions examples/cpp/ensemble/src/main.cu
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,23 @@ int main(int argc, const char ** argv) {
* Check result for each log
*/
const std::map<unsigned int, flamegpu::RunLog> &logs = cuda_ensemble.getLogs();
unsigned int init_sum = 0, expected_init_sum = 0;
uint64_t result_sum = 0, expected_result_sum = 0;
if (!cuda_ensemble.Config().mpi || logs.size() > 0) {
unsigned int init_sum = 0, expected_init_sum = 0;
uint64_t result_sum = 0, expected_result_sum = 0;

for (const auto &[i, log] : logs) {
const int init = i/10;
const int init_offset = 1 - i/50;
expected_init_sum += init;
expected_result_sum += POPULATION_TO_GENERATE * init + init_offset * ((POPULATION_TO_GENERATE-1)*POPULATION_TO_GENERATE/2); // Initial agent values
expected_result_sum += POPULATION_TO_GENERATE * STEPS * i; // Agent values added by steps
const flamegpu::ExitLogFrame &exit_log = log.getExitLog();
init_sum += exit_log.getEnvironmentProperty<int>("init");
result_sum += exit_log.getEnvironmentProperty<int>("result");
for (const auto &[i, log] : logs) {
const int init = i/10;
const int init_offset = 1 - i/50;
expected_init_sum += init;
expected_result_sum += POPULATION_TO_GENERATE * init + init_offset * ((POPULATION_TO_GENERATE-1)*POPULATION_TO_GENERATE/2); // Initial agent values
expected_result_sum += POPULATION_TO_GENERATE * STEPS * i; // Agent values added by steps
const flamegpu::ExitLogFrame &exit_log = log.getExitLog();
init_sum += exit_log.getEnvironmentProperty<int>("init");
result_sum += exit_log.getEnvironmentProperty<int>("result");
}
printf("Ensemble init: %u, calculated init %u\n", expected_init_sum, init_sum);
printf("Ensemble result: %zu, calculated result %zu\n", expected_result_sum, result_sum);
}
printf("Ensemble init: %u, calculated init %u\n", expected_init_sum, init_sum);
printf("Ensemble result: %zu, calculated result %zu\n", expected_result_sum, result_sum);

/**
* Report if MPI was enabled
*/
Expand Down
71 changes: 65 additions & 6 deletions include/flamegpu/simulation/detail/MPIEnsemble.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <mpi.h>

#include <map>
#include <set>
#include <string>
#include <mutex>
#include <vector>
Expand Down Expand Up @@ -41,11 +42,11 @@ class MPIEnsemble {
/**
* The rank within the MPI shared memory communicator (i.e. the within the node)
*/
const int group_rank;
const int local_rank;
/**
* The size of the MPI shared memory communicator (i.e. the within the node)
*/
const int group_size;
const int local_size;
/**
* The total number of runs to be executed (only used for printing error warnings)
*/
Expand Down Expand Up @@ -86,16 +87,54 @@ class MPIEnsemble {
*/
void worldBarrier();
/**
* If world_rank!=0, send the local GPU string to world_rank==0 and return empty string
* If world_rank!=0 and local_rank == 0, send the local GPU string to world_rank==0 and return empty string
* If world_rank==0, receive GPU strings and assemble the full remote GPU string to be returned
*/
std::string assembleGPUsString();
/**
* Common function for handling local errors during MPI execution
* Needs the set of in-use devices, not the config specified list of devices
*/
void retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex,
std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail,
std::vector<AbstractSimRunner::ErrorDetail> &err_detail_local, int i);
std::vector<AbstractSimRunner::ErrorDetail> &err_detail_local, int i, std::set<int> devices);
/**
* Create the split MPI Communicator based on if the thread is participating in ensemble execution or not, based on the group rank and number of local GPUs.
* @param isParticipating If this rank is participating (i.e. it has a local device assigned)
* @return success of this method
*/
bool createParticipatingCommunicator(bool isParticipating);
/**
* Accessor method for if the rank is participating or not (i.e. the colour of the communicator split)
*/
int getRankIsParticipating() { return this->rank_is_participating; }
/**
* Accessor method for the size of the MPI communicator containing "participating" (or non-participating) ranks
*/
int getParticipatingCommSize() { return this->participating_size; }
/**
* Accessor method for the rank within the MPI communicator containing "participating" (or non-participating) ranks
*/
int getParticipatingCommRank() { return this->participating_rank; }

/**
* Static method to select devices for the current mpi rank, based on the provided list of devices.
* This static version exists so that it is testable.
* A non static version which queries the curernt mpi environment is also provided as a simpler interface
* @param devicesToSelectFrom set of device indices to use, provided from the config or initialised to contain all visible devices.
* @param local_size the number of mpi processes on the current shared memory system
* @param local_rank the current process' mpi rank within the shared memory system
* @return the gpus to be used by the current mpi rank, which may be empty.
*/
static std::set<int> devicesForThisRank(std::set<int> devicesToSelectFrom, int local_size, int local_rank);

/**
* Method to select devices for the current mpi rank, based on the provided list of devices.
* This non-static version calls the other overload with the current mpi size/ranks, i.e. this is the version that should be used.
* @param devicesToSelectFrom set of device indices to use, provided from the config or initialised to contain all visible devices.
* @return the gpus to be used by the current mpi rank, which may be empty.
*/
std::set<int> devicesForThisRank(std::set<int> devicesToSelectFrom);

private:
/**
Expand All @@ -119,13 +158,33 @@ class MPIEnsemble {
*/
static void initMPI();
/**
* Iterate config.devices to find the item at index j
* Iterate the provided set of devices to find the item at index j
* This doesn't use the config.devices as it may have been mutated based on the number of mpi ranks used.
*/
unsigned int getDeviceIndex(const int j);
unsigned int getDeviceIndex(const int j, std::set<int> devices);
/**
* MPI representation of AbstractSimRunner::ErrorDetail type
*/
const MPI_Datatype MPI_ERROR_DETAIL;
/**
* flag indicating if the current MPI rank is a participating rank (i.e. it has atleast one GPU it can use).
* This is not a const public member, as it can only be computed after world and local rank / sizes and local gpu count are known.
* This is used to form the colour in the participating communicator
*/
bool rank_is_participating;
/**
* An MPI communicator, split by whether the mpi rank is participating in simulation or not.
* non-participating ranks will have a commincator which only contains non-participating ranks, but these will never use the communicator
*/
MPI_Comm comm_participating;
/**
* The size of the MPI communicator containing "participating" (or non-participating) ranks
*/
int participating_size;
/**
* The rank within the MPI communicator containing "participating" (or non-participating) ranks
*/
int participating_rank;
};
} // namespace detail
} // namespace flamegpu
Expand Down
59 changes: 52 additions & 7 deletions src/flamegpu/simulation/CUDAEnsemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
// Resize means we can setup logs during execution out of order, without risk of list being reallocated
run_logs.clear();
// Workout how many devices and runner we will be executing
// if MPI is enabled, This will throw exceptions if any rank has 0 GPUs visible, prior to device allocation preventing issues where rank 0 would not be participating.
int device_count = -1;
cudaError_t cudaStatus = cudaGetDeviceCount(&device_count);
if (cudaStatus != cudaSuccess) {
Expand All @@ -150,14 +151,26 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
}
}

// Select the actual devices to be used, based on user provided gpus, architecture compatibility, and optionally mpi ranks per node.
// For non-mpi builds / configurations, just use all the devices provided by the user / all visible devices (then check they are valid later)
// For MPI builds with mpi enabled, load balance the gpus across mpi ranks within the shared memory system. If there are more ranks than gpus, latter ranks will not participate.
std::set<int> devices;
// initialise the local devices set to be the non-mpi behaviour, using config.devices or all visible cuda devices
if (config.devices.size()) {
devices = config.devices;
} else {
// If no devices were specified by the user, use all visible devices but load balance if MPI is in use.
for (int i = 0; i < device_count; ++i) {
devices.emplace(i);
}
}
#ifdef FLAMEGPU_ENABLE_MPI
// if MPI is enabled at compile time, and mpi is enabled in the config, use the MPIEnsemble method to assign devices balanced across ranks
if (mpi != nullptr) {
devices = mpi->devicesForThisRank(devices);
}
#endif // ifdef FLAMEGPU_ENABLE_MPI

// Check that each device is capable, and init cuda context
for (auto d = devices.begin(); d != devices.end(); ++d) {
if (!detail::compute_capability::checkComputeCapability(*d)) {
Expand All @@ -172,6 +185,36 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
// Return to device 0 (or check original device first?)
gpuErrchk(cudaSetDevice(0));

// If there are no devices left (and mpi is not being used), we need to error as the work cannot be executed.
#ifdef FLAMEGPU_ENABLE_MPI
if (devices.size() == 0 && mpi == nullptr) {
THROW exception::InvalidCUDAdevice("FLAMEGPU2 has not been built with an appropraite compute capability for any devices, unable to continue\n");
}
#else // FLAMEGPU_ENABLE_MPI
if (devices.size() == 0) {
THROW exception::InvalidCUDAdevice("FLAMEGPU2 has not been built with an appropraite compute capability for any devices, unable to continue\n");
}
#endif // FLAMEGPU_ENABLE_MPI

#ifdef FLAMEGPU_ENABLE_MPI
// Once the number of devices per rank is known, we can create the actual communicator to be used during MPI, so we can warn/error as needed.
if (mpi != nullptr) {
// This rank is participating if it has atleast one device assigned to it.
// Rank 0 will be participating at this point, otherwise InvalidCUDAdevice would have been thrown
// This also implies the participating communicator cannot have a size of 0, as atleast one thread must be participating at this point, but throw in that case just in case.
bool communicatorCreated = mpi->createParticipatingCommunicator(devices.size() > 0);
// If the communicator failed to be created or is empty for any participating threads, throw. This should never occur.
if (!communicatorCreated || mpi->getParticipatingCommSize() == 0) {
THROW exception::EnsembleError("Unable to create MPI communicator. Ensure atleast one GPU is visible.\n");
}
// If the world size is not the participating size, issue a warning.that too many threads have been used.
if (mpi->world_rank == 0 && mpi->world_size != mpi->getParticipatingCommSize() && config.verbosity >= Verbosity::Default) {
fprintf(stderr, "Warning: MPI Ensemble launched with %d MPI ranks, but only %d ranks have GPUs assigned. %d ranks are unneccesary.\n", mpi->world_size, mpi->getParticipatingCommSize(), mpi->world_size - mpi->getParticipatingCommSize());
fflush(stderr);
}
}
#endif

const unsigned int TOTAL_RUNNERS = static_cast<unsigned int>(devices.size()) * config.concurrent_runs;

// Log Time (We can't use CUDA events here, due to device resets)
Expand Down Expand Up @@ -231,7 +274,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
int flag;
int mpi_runners_fin = 1; // Start at 1 because we have always already finished
// Wait for all runs to have been assigned, and all MPI runners to have been notified of fin
while (next_run < plans.size() || mpi_runners_fin < mpi->world_size) {
while (next_run < plans.size() || mpi_runners_fin < mpi->getParticipatingCommSize()) {
// Check for errors
const int t_err_count = mpi->receiveErrors(err_detail);
err_count += t_err_count;
Expand All @@ -245,7 +288,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
unsigned int run_id = r.load();
if (run_id == detail::MPISimRunner::Signal::RunFailed) {
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i);
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
++err_count;
if (config.error_level == EnsembleConfig::Fast) {
// Skip to end to kill workers
Expand All @@ -267,8 +310,8 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
// Yield, rather than hammering the processor
std::this_thread::yield();
}
} else {
// Wait for all runs to have been assigned, and all MPI runners to have been notified of fin
} else if (mpi->getRankIsParticipating()) {
// Wait for all runs to have been assigned, and all MPI runners to have been notified of fin. ranks without GPU(s) do not request jobs.
unsigned int next_run = 0;
MPI_Status status;
while (next_run < plans.size()) {
Expand All @@ -280,7 +323,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
const unsigned int failed_run_id = err_cts[i].exchange(UINT_MAX);
++err_count;
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i);
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
runner_status = detail::MPISimRunner::Signal::RequestJob;
}
if (runner_status == detail::MPISimRunner::Signal::RequestJob) {
Expand All @@ -303,7 +346,8 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
if (r.exchange(plans.size()) == detail::MPISimRunner::Signal::RunFailed) {
++err_count;
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i);
fprintf(stdout, "rank %d: %s::%d\n", mpi->world_rank, __FILE__, __LINE__); fflush(stdout);
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
}
}
// Wait for all runners to exit
Expand All @@ -313,7 +357,8 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
if (next_runs[i].load() == detail::MPISimRunner::Signal::RunFailed) {
++err_count;
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i);
fprintf(stdout, "rank %d: %s::%d\n", mpi->world_rank, __FILE__, __LINE__); fflush(stdout);
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
}
}
#endif
Expand Down
Loading

0 comments on commit bcc66be

Please sign in to comment.