Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
Fix a race condition, and logger always being killed on error

Test now consistently pass with mpirun -n 2
  • Loading branch information
Robadob committed Nov 7, 2023
1 parent f5c9962 commit 626de57
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 11 deletions.
6 changes: 4 additions & 2 deletions include/flamegpu/simulation/detail/MPIEnsemble.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <map>
#include <string>
#include <mutex>
#include <vector>

#include "flamegpu/simulation/CUDAEnsemble.h"
#include "flamegpu/simulation/detail/MPISimRunner.h"
Expand Down Expand Up @@ -81,7 +80,6 @@ class MPIEnsemble {
void retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex,
std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail,
std::vector<AbstractSimRunner::ErrorDetail> &err_detail_local, int i);

Check failure on line 82 in include/flamegpu/simulation/detail/MPIEnsemble.h

View workflow job for this annotation

GitHub Actions / cpplint (11.8, ubuntu-20.04)

Add #include <vector> for vector<>

private:

Check failure on line 83 in include/flamegpu/simulation/detail/MPIEnsemble.h

View workflow job for this annotation

GitHub Actions / cpplint (11.8, ubuntu-20.04)

"private:" should be preceded by a blank line
/**
* @return Retrieve the local world rank from MPI
Expand All @@ -95,6 +93,10 @@ class MPIEnsemble {
* If necessary initialise MPI, else do nothing
*/
void initMPI();
/**
* Iterate config.devices to find the item at index j
*/
unsigned int getDeviceIndex(const int j);
};
} // namespace detail
} // namespace flamegpu
Expand Down
13 changes: 10 additions & 3 deletions src/flamegpu/simulation/CUDAEnsemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,19 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
if (runner_status == detail::MPISimRunner::Signal::RunFailed) {
// Fetch the job id, increment local error counter
const unsigned int failed_run_id = err_cts[i].exchange(UINT_MAX);
++err_count;
++err_count;

Check failure on line 281 in src/flamegpu/simulation/CUDAEnsemble.cu

View workflow job for this annotation

GitHub Actions / cpplint (11.8, ubuntu-20.04)

Line ends in whitespace. Consider deleting these extra spaces.
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i);
runner_status = detail::MPISimRunner::Signal::RequestJob;
}
if (runner_status == detail::MPISimRunner::Signal::RequestJob) {
next_run = mpi->requestJob();
// Pass the job to runner that requested it
next_runs[i].store(next_run);
// Break if assigned job is out of range, work is finished
if (next_run >= plans.size()) {
break;
}
// Pass the job to runner that requested it
next_runs[i].store(next_run);
}
}
std::this_thread::yield();
Expand Down Expand Up @@ -438,6 +438,13 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
}
}

#ifdef FLAMEGPU_ENABLE_MPI
if (config.mpi && mpi->world_rank != 0) {
// All errors are reported via rank 0
err_count = 0;
}
#endif

if (config.error_level == EnsembleConfig::Fast && err_count) {
if (config.mpi) {
#ifdef FLAMEGPU_ENABLE_MPI
Expand Down
21 changes: 17 additions & 4 deletions src/flamegpu/simulation/detail/MPIEnsemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,15 @@ void MPIEnsemble::initMPI() {
}
}
}
void MPIEnsemble::retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex, std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail,
unsigned int MPIEnsemble::getDeviceIndex(const int j) {
int i = 0;
for (auto& d : config.devices) {
if (i++ == j)
return d;
}
return UINT_MAX;
}
void MPIEnsemble::retrieveLocalErrorDetail(std::mutex &log_export_queue_mutex, std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail,

Check failure on line 220 in src/flamegpu/simulation/detail/MPIEnsemble.cu

View workflow job for this annotation

GitHub Actions / cpplint (11.8, ubuntu-20.04)

Line ends in whitespace. Consider deleting these extra spaces.
std::vector<detail::AbstractSimRunner::ErrorDetail> &err_detail_local, const int i) {
// Fetch error detail
detail::AbstractSimRunner::ErrorDetail e_detail;
Expand All @@ -218,19 +226,24 @@ std::vector<detail::AbstractSimRunner::ErrorDetail> &err_detail_local, const int
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
// Fetch corresponding error detail
bool success = false;
const unsigned int t_device_id = i / config.concurrent_runs;
const unsigned int t_device_id = getDeviceIndex(i / config.concurrent_runs);
const unsigned int t_runner_id = i % config.concurrent_runs;
for (auto it = err_detail_local.begin(); it != err_detail_local.end(); ++it) {
if (it->runner_id == t_runner_id && it->device_id == t_device_id) {
e_detail = *it;
err_detail.emplace(world_rank, e_detail);
if (world_rank == 0) {
// Only rank 0 collects error details
err_detail.emplace(world_rank, e_detail);
} else {
// fprintf(stderr, "[%d] Purged error from device %u runner %u\n", world_rank, t_device_id, t_runner_id); // useful debug, breaks tests
}
err_detail_local.erase(it);
success = true;
break;
}
}
if (!success) {
THROW exception::UnknownInternalError("Management thread failed to locate reported error from device %u runner %u, in CUDAEnsemble::simulate()", t_device_id, t_runner_id);
THROW exception::UnknownInternalError("[%d] Management thread failed to locate reported error from device %u runner %u from %u errors, in CUDAEnsemble::simulate()", world_rank, t_device_id, t_runner_id, static_cast<unsigned int>(err_detail_local.size()));
}
}
if (world_rank == 0) {
Expand Down
3 changes: 1 addition & 2 deletions src/flamegpu/simulation/detail/MPISimRunner.cu
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ void MPISimRunner::main() {
} catch(std::exception &e) {
// log_export_mutex is treated as our protection for race conditions on err_detail
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
log_export_queue.push(UINT_MAX);
// Build the error detail (fixed len char array for string)
printf("Fail: run: %u device: %u, runner: %u\n", run_id, device_id, runner_id);
// fprintf(stderr, "Fail: run: %u device: %u, runner: %u\n", run_id, device_id, runner_id); // useful debug, breaks tests
err_detail.push_back(ErrorDetail{run_id, static_cast<unsigned int>(device_id), runner_id, });
strncpy(err_detail.back().exception_string, e.what(), sizeof(ErrorDetail::exception_string)-1);
err_detail.back().exception_string[sizeof(ErrorDetail::exception_string) - 1] = '\0';
Expand Down
10 changes: 10 additions & 0 deletions tests/test_cases/simulation/test_mpi_ensemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ TEST_F(TestMPIEnsemble, error_off) {
// Get stderr and stdout
const std::string output = testing::internal::GetCapturedStdout();
const std::string errors = testing::internal::GetCapturedStderr();
// printf("[%d]output:\n:%s\n", world_rank, output.c_str());
// printf("[%d]errors:\n:%s\n", world_rank, errors.c_str());
// Expect no warnings (stderr) but outputs on progress and timing
if (world_rank == 0) {
// With error off, we would expect to see run index 10 fail
Expand Down Expand Up @@ -233,6 +235,8 @@ TEST_F(TestMPIEnsemble, error_slow) {
// Get stderr and stdout
const std::string output = testing::internal::GetCapturedStdout();
const std::string errors = testing::internal::GetCapturedStderr();
// printf("[%d]output:\n:%s\n", world_rank, output.c_str());
// printf("[%d]errors:\n:%s\n", world_rank, errors.c_str());
EXPECT_TRUE(output.find("MPI ensemble assigned run ") != std::string::npos); // E.g. MPI ensemble assigned run %d/%u to rank %d
EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully!
EXPECT_TRUE(errors.find("Warning: Run 9/") != std::string::npos); // E.g. Warning: Run 10/10 failed on rank 0, device 0, thread 0 with exception:
Expand All @@ -244,6 +248,8 @@ TEST_F(TestMPIEnsemble, error_slow) {
// Get stderr and stdout
const std::string output = testing::internal::GetCapturedStdout();
const std::string errors = testing::internal::GetCapturedStderr();
// printf("[%d]output:\n:%s\n", world_rank, output.c_str());
// printf("[%d]errors:\n:%s\n", world_rank, errors.c_str());
EXPECT_TRUE(output.empty());
EXPECT_TRUE(errors.empty());
}
Expand All @@ -264,6 +270,8 @@ TEST_F(TestMPIEnsemble, error_fast) {
// Get stderr and stdout
const std::string output = testing::internal::GetCapturedStdout();
const std::string errors = testing::internal::GetCapturedStderr();
// printf("[%d]output:\n:%s\n", world_rank, output.c_str());
// printf("[%d]errors:\n:%s\n", world_rank, errors.c_str());
EXPECT_TRUE(output.find("MPI ensemble assigned run ") != std::string::npos); // E.g. MPI ensemble assigned run %d/%u to rank %d
#ifdef _DEBUG
EXPECT_TRUE(errors.find("Warning: Run 9/") != std::string::npos); // E.g. Warning: Run 10/10 failed on rank 0, device 0, thread 0 with exception:
Expand All @@ -277,6 +285,8 @@ TEST_F(TestMPIEnsemble, error_fast) {
// Get stderr and stdout
const std::string output = testing::internal::GetCapturedStdout();
const std::string errors = testing::internal::GetCapturedStderr();
// printf("[%d]output:\n:%s\n", world_rank, output.c_str());
// printf("[%d]errors:\n:%s\n", world_rank, errors.c_str());
EXPECT_TRUE(output.empty());
EXPECT_TRUE(errors.empty());
}
Expand Down

0 comments on commit 626de57

Please sign in to comment.