Skip to content

Commit

Permalink
Require more tests. But I think it should work.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robadob committed Nov 3, 2023
1 parent e202c52 commit e1950ec
Showing 1 changed file with 70 additions and 30 deletions.
100 changes: 70 additions & 30 deletions src/flamegpu/simulation/CUDAEnsemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
#ifdef FLAMEGPU_ENABLE_MPI
// In MPI mode, Rank 0 will collect errors from all ranks
std::multimap<int, detail::AbstractSimRunner::ErrorDetail> err_detail = {};
const MPI_Datatype MPI_ERROR_DETAIL = config.mpi ? detail::AbstractSimRunner::createErrorDetailMPIDatatype() : 0;
#endif
std::vector<detail::AbstractSimRunner::ErrorDetail> err_detail_local = {};

Expand All @@ -215,7 +216,6 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
unsigned int err_count = 0;
if (config.mpi) {
#ifdef FLAMEGPU_ENABLE_MPI
const MPI_Datatype MPI_ERROR_DETAIL = detail::AbstractSimRunner::createErrorDetailMPIDatatype();
// Setup MPISimRunners
detail::MPISimRunner** runners = static_cast<detail::MPISimRunner**>(malloc(sizeof(detail::MPISimRunner*) * TOTAL_RUNNERS));
std::vector<std::atomic<unsigned int>> err_cts(TOTAL_RUNNERS);
Expand Down Expand Up @@ -260,13 +260,13 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
detail::AbstractSimRunner::ErrorDetail e_detail;
memset(&e_detail, 0, sizeof(detail::AbstractSimRunner::ErrorDetail));
MPI_Recv(
&e_detail, // void* data
1, // int count
MPI_ERROR_DETAIL, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
MPI_ANY_SOURCE, // int source
&e_detail, // void* data
1, // int count
MPI_ERROR_DETAIL, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
MPI_ANY_SOURCE, // int source
EnvelopeTag::ReportError, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status*
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status*
err_detail.emplace(status.MPI_SOURCE, e_detail);
++err_count;
if (config.error_level == EnsembleConfig::Fast) {
Expand Down Expand Up @@ -490,38 +490,45 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
runners[i]->join();
delete runners[i];
if (next_runs[i].load() == detail::MPISimRunner::Signal::RunFailed) {
if (world_rank == 0) {
++err_count;
// Fetch error detail
detail::AbstractSimRunner::ErrorDetail e_detail;
{
// log_export_mutex is treated as our protection for race conditions on err_detail
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
// Fetch corresponding error detail
bool success = false;
const unsigned int t_device_id = i / config.concurrent_runs;
const unsigned int t_runner_id = i % config.concurrent_runs;
for (auto it = err_detail_local.begin(); it != err_detail_local.end(); ++it) {
if (it->runner_id == t_runner_id && it->device_id == t_device_id) {
e_detail = *it;
err_detail.emplace(world_rank, e_detail);
err_detail_local.erase(it);
success = true;
break;
}
}
if (!success) {
THROW exception::UnknownInternalError("Management thread failed to locate reported error from device %u runner %u, in CUDAEnsemble::simulate()", t_device_id, t_runner_id);
++err_count;
// Fetch error detail
detail::AbstractSimRunner::ErrorDetail e_detail;
{
// log_export_mutex is treated as our protection for race conditions on err_detail
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
// Fetch corresponding error detail
bool success = false;
const unsigned int t_device_id = i / config.concurrent_runs;
const unsigned int t_runner_id = i % config.concurrent_runs;
for (auto it = err_detail_local.begin(); it != err_detail_local.end(); ++it) {
if (it->runner_id == t_runner_id && it->device_id == t_device_id) {
e_detail = *it;
err_detail.emplace(world_rank, e_detail);
err_detail_local.erase(it);
success = true;
break;
}
}
if (!success) {
THROW exception::UnknownInternalError("Management thread failed to locate reported error from device %u runner %u, in CUDAEnsemble::simulate()", t_device_id, t_runner_id);
}
}
if (world_rank == 0) {
// Progress flush
if (config.verbosity >= Verbosity::Default && config.error_level != EnsembleConfig::Fast) {
fprintf(stderr, "Warning: Run %u/%u failed on rank %d, device %d, thread %u with exception: \n%s\n",
e_detail.run_id + 1, static_cast<unsigned int>(plans.size()), world_rank, e_detail.device_id, e_detail.runner_id, e_detail.exception_string);
fflush(stderr);
}
} else {
// @todo
// Notify 0 that an error occurred, with the error detail
MPI_Send(
&e_detail, // void* data
1, // int count
MPI_ERROR_DETAIL, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
0, // int destination
EnvelopeTag::ReportError, // int tag
MPI_COMM_WORLD); // MPI_Comm communicator
}
}
}
Expand Down Expand Up @@ -569,6 +576,39 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
if (config.mpi) {
// Ensure all workers have finished before exit
MPI_Barrier(MPI_COMM_WORLD);
// Check whether MPI runners have reported any final errors
MPI_Status status;
int flag;
MPI_Iprobe(
MPI_ANY_SOURCE, // int source
EnvelopeTag::ReportError, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&flag, // int flag
&status);
while (flag) {
// Receive the message
memset(&status, 0, sizeof(MPI_Status));
detail::AbstractSimRunner::ErrorDetail e_detail;
memset(&e_detail, 0, sizeof(detail::AbstractSimRunner::ErrorDetail));
MPI_Recv(
&e_detail, // void* data
1, // int count
MPI_ERROR_DETAIL, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL)
MPI_ANY_SOURCE, // int source
EnvelopeTag::ReportError, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status*
err_detail.emplace(status.MPI_SOURCE, e_detail);
++err_count;
// Progress flush
if (config.verbosity >= Verbosity::Default && config.error_level != EnsembleConfig::Fast) {
fprintf(stderr, "Warning: Run %u/%u failed on rank %d, device %d, thread %u with exception: \n%s\n",
e_detail.run_id + 1, static_cast<unsigned int>(plans.size()), status.MPI_SOURCE, e_detail.device_id, e_detail.runner_id, e_detail.exception_string);
fflush(stderr);
}
// Check again
MPI_Iprobe(MPI_ANY_SOURCE, EnvelopeTag::ReportError, MPI_COMM_WORLD, &flag, &status);
}
if (config.telemetry) {
// All ranks should notify rank 0 of their GPU devices
if (world_rank == 0) {
Expand Down

0 comments on commit e1950ec

Please sign in to comment.