diff --git a/include/flamegpu/simulation/detail/MPIEnsemble.h b/include/flamegpu/simulation/detail/MPIEnsemble.h index b9ebc5d44..0d68b7e4b 100644 --- a/include/flamegpu/simulation/detail/MPIEnsemble.h +++ b/include/flamegpu/simulation/detail/MPIEnsemble.h @@ -6,7 +6,6 @@ #include #include #include -#include #include "flamegpu/simulation/CUDAEnsemble.h" #include "flamegpu/simulation/detail/MPISimRunner.h" @@ -81,7 +80,6 @@ class MPIEnsemble { void retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex, std::multimap &err_detail, std::vector &err_detail_local, int i); - private: /** * @return Retrieve the local world rank from MPI @@ -95,6 +93,10 @@ class MPIEnsemble { * If necessary initialise MPI, else do nothing */ void initMPI(); + /** + * Iterate config.devices to find the item at index j + */ + unsigned int getDeviceIndex(const int j); }; } // namespace detail } // namespace flamegpu diff --git a/src/flamegpu/simulation/CUDAEnsemble.cu b/src/flamegpu/simulation/CUDAEnsemble.cu index cb135181d..eecf7c045 100644 --- a/src/flamegpu/simulation/CUDAEnsemble.cu +++ b/src/flamegpu/simulation/CUDAEnsemble.cu @@ -278,19 +278,19 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) { if (runner_status == detail::MPISimRunner::Signal::RunFailed) { // Fetch the job id, increment local error counter const unsigned int failed_run_id = err_cts[i].exchange(UINT_MAX); - ++err_count; + ++err_count; // Retrieve and handle local error detail mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i); runner_status = detail::MPISimRunner::Signal::RequestJob; } if (runner_status == detail::MPISimRunner::Signal::RequestJob) { next_run = mpi->requestJob(); + // Pass the job to runner that requested it + next_runs[i].store(next_run); // Break if assigned job is out of range, work is finished if (next_run >= plans.size()) { break; } - // Pass the job to runner that requested it - next_runs[i].store(next_run); } } std::this_thread::yield(); @@ -438,6 +438,13 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) { } } +#ifdef FLAMEGPU_ENABLE_MPI + if (config.mpi && mpi->world_rank != 0) { + // All errors are reported via rank 0 + err_count = 0; + } +#endif + if (config.error_level == EnsembleConfig::Fast && err_count) { if (config.mpi) { #ifdef FLAMEGPU_ENABLE_MPI diff --git a/src/flamegpu/simulation/detail/MPIEnsemble.cu b/src/flamegpu/simulation/detail/MPIEnsemble.cu index fadaf6220..60c15c01c 100644 --- a/src/flamegpu/simulation/detail/MPIEnsemble.cu +++ b/src/flamegpu/simulation/detail/MPIEnsemble.cu @@ -209,7 +209,15 @@ void MPIEnsemble::initMPI() { } } } -void MPIEnsemble::retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex, std::multimap &err_detail, +unsigned int MPIEnsemble::getDeviceIndex(const int j) { + int i = 0; + for (auto& d : config.devices) { + if (i++ == j) + return d; + } + return UINT_MAX; +} +void MPIEnsemble::retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex, std::multimap &err_detail, std::vector &err_detail_local, const int i) { // Fetch error detail detail::AbstractSimRunner::ErrorDetail e_detail; @@ -218,19 +226,24 @@ std::vector &err_detail_local, const int std::lock_guard lck(log_export_queue_mutex); // Fetch corresponding error detail bool success = false; - const unsigned int t_device_id = i / config.concurrent_runs; + const unsigned int t_device_id = getDeviceIndex(i / config.concurrent_runs); const unsigned int t_runner_id = i % config.concurrent_runs; for (auto it = err_detail_local.begin(); it != err_detail_local.end(); ++it) { if (it->runner_id == t_runner_id && it->device_id == t_device_id) { e_detail = *it; - err_detail.emplace(world_rank, e_detail); + if (world_rank == 0) { + // Only rank 0 collects error details + err_detail.emplace(world_rank, e_detail); + } else { + // fprintf(stderr, "[%d] Purged error from device %u runner %u\n", world_rank, t_device_id, t_runner_id); // useful debug, breaks tests + } err_detail_local.erase(it); success = true; break; } } if (!success) { - THROW exception::UnknownInternalError("Management thread failed to locate reported error from device %u runner %u, in CUDAEnsemble::simulate()", t_device_id, t_runner_id); + THROW exception::UnknownInternalError("[%d] Management thread failed to locate reported error from device %u runner %u from %u errors, in CUDAEnsemble::simulate()", world_rank, t_device_id, t_runner_id, static_cast(err_detail_local.size())); } } if (world_rank == 0) { diff --git a/src/flamegpu/simulation/detail/MPISimRunner.cu b/src/flamegpu/simulation/detail/MPISimRunner.cu index 7d799271c..164b2988d 100644 --- a/src/flamegpu/simulation/detail/MPISimRunner.cu +++ b/src/flamegpu/simulation/detail/MPISimRunner.cu @@ -65,9 +65,8 @@ void MPISimRunner::main() { } catch(std::exception &e) { // log_export_mutex is treated as our protection for race conditions on err_detail std::lock_guard lck(log_export_queue_mutex); - log_export_queue.push(UINT_MAX); // Build the error detail (fixed len char array for string) - printf("Fail: run: %u device: %u, runner: %u\n", run_id, device_id, runner_id); + // fprintf(stderr, "Fail: run: %u device: %u, runner: %u\n", run_id, device_id, runner_id); // useful debug, breaks tests err_detail.push_back(ErrorDetail{run_id, static_cast(device_id), runner_id, }); strncpy(err_detail.back().exception_string, e.what(), sizeof(ErrorDetail::exception_string)-1); err_detail.back().exception_string[sizeof(ErrorDetail::exception_string) - 1] = '\0'; diff --git a/tests/test_cases/simulation/test_mpi_ensemble.cu b/tests/test_cases/simulation/test_mpi_ensemble.cu index 37ab47fd1..89de24f32 100644 --- a/tests/test_cases/simulation/test_mpi_ensemble.cu +++ b/tests/test_cases/simulation/test_mpi_ensemble.cu @@ -199,6 +199,8 @@ TEST_F(TestMPIEnsemble, error_off) { // Get stderr and stdout const std::string output = testing::internal::GetCapturedStdout(); const std::string errors = testing::internal::GetCapturedStderr(); + // printf("[%d]output:\n:%s\n", world_rank, output.c_str()); + // printf("[%d]errors:\n:%s\n", world_rank, errors.c_str()); // Expect no warnings (stderr) but outputs on progress and timing if (world_rank == 0) { // With error off, we would expect to see run index 10 fail @@ -233,6 +235,8 @@ TEST_F(TestMPIEnsemble, error_slow) { // Get stderr and stdout const std::string output = testing::internal::GetCapturedStdout(); const std::string errors = testing::internal::GetCapturedStderr(); + // printf("[%d]output:\n:%s\n", world_rank, output.c_str()); + // printf("[%d]errors:\n:%s\n", world_rank, errors.c_str()); EXPECT_TRUE(output.find("MPI ensemble assigned run ") != std::string::npos); // E.g. MPI ensemble assigned run %d/%u to rank %d EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully! EXPECT_TRUE(errors.find("Warning: Run 9/") != std::string::npos); // E.g. Warning: Run 10/10 failed on rank 0, device 0, thread 0 with exception: @@ -244,6 +248,8 @@ TEST_F(TestMPIEnsemble, error_slow) { // Get stderr and stdout const std::string output = testing::internal::GetCapturedStdout(); const std::string errors = testing::internal::GetCapturedStderr(); + // printf("[%d]output:\n:%s\n", world_rank, output.c_str()); + // printf("[%d]errors:\n:%s\n", world_rank, errors.c_str()); EXPECT_TRUE(output.empty()); EXPECT_TRUE(errors.empty()); } @@ -264,6 +270,8 @@ TEST_F(TestMPIEnsemble, error_fast) { // Get stderr and stdout const std::string output = testing::internal::GetCapturedStdout(); const std::string errors = testing::internal::GetCapturedStderr(); + // printf("[%d]output:\n:%s\n", world_rank, output.c_str()); + // printf("[%d]errors:\n:%s\n", world_rank, errors.c_str()); EXPECT_TRUE(output.find("MPI ensemble assigned run ") != std::string::npos); // E.g. MPI ensemble assigned run %d/%u to rank %d #ifdef _DEBUG EXPECT_TRUE(errors.find("Warning: Run 9/") != std::string::npos); // E.g. Warning: Run 10/10 failed on rank 0, device 0, thread 0 with exception: @@ -277,6 +285,8 @@ TEST_F(TestMPIEnsemble, error_fast) { // Get stderr and stdout const std::string output = testing::internal::GetCapturedStdout(); const std::string errors = testing::internal::GetCapturedStderr(); + // printf("[%d]output:\n:%s\n", world_rank, output.c_str()); + // printf("[%d]errors:\n:%s\n", world_rank, errors.c_str()); EXPECT_TRUE(output.empty()); EXPECT_TRUE(errors.empty()); }