diff --git a/include/flamegpu/simulation/detail/MPIEnsemble.h b/include/flamegpu/simulation/detail/MPIEnsemble.h index 548956eaf..81d7096e4 100644 --- a/include/flamegpu/simulation/detail/MPIEnsemble.h +++ b/include/flamegpu/simulation/detail/MPIEnsemble.h @@ -15,7 +15,6 @@ namespace flamegpu { namespace detail { class MPIEnsemble { - const MPI_Datatype MPI_ERROR_DETAIL; const CUDAEnsemble::EnsembleConfig &config; // Tags to different the MPI messages used in protocol enum EnvelopeTag : int { @@ -98,6 +97,10 @@ class MPIEnsemble { * Iterate config.devices to find the item at index j */ unsigned int getDeviceIndex(const int j); + /** + * MPI representation of AbstractSimRunner::ErrorDetail type + */ + const MPI_Datatype MPI_ERROR_DETAIL; }; } // namespace detail } // namespace flamegpu diff --git a/src/flamegpu/simulation/detail/MPIEnsemble.cu b/src/flamegpu/simulation/detail/MPIEnsemble.cu index 33a904894..f186c5a31 100644 --- a/src/flamegpu/simulation/detail/MPIEnsemble.cu +++ b/src/flamegpu/simulation/detail/MPIEnsemble.cu @@ -7,11 +7,11 @@ namespace flamegpu { namespace detail { MPIEnsemble::MPIEnsemble(const CUDAEnsemble::EnsembleConfig &_config, const unsigned int _total_runs) - : MPI_ERROR_DETAIL(AbstractSimRunner::createErrorDetailMPIDatatype()) - , config(_config) + : config(_config) , world_rank(getWorldRank()) , world_size(getWorldSize()) - , total_runs(_total_runs) { } + , total_runs(_total_runs) + , MPI_ERROR_DETAIL(AbstractSimRunner::createErrorDetailMPIDatatype()) { } int MPIEnsemble::receiveErrors(std::multimap &err_detail) { int errCount = 0; @@ -196,6 +196,7 @@ int MPIEnsemble::getWorldSize() { return world_size; } void MPIEnsemble::initMPI() { + printf("MPI IS BEING INIT\n"); int flag = 0; // MPI can only be init once, for certain test cases we do some initial MPI comms for setup MPI_Initialized(&flag); diff --git a/tests/python/test_mpi.py b/tests/python/test_mpi.py new file mode 100644 index 000000000..9678b81d9 --- /dev/null +++ b/tests/python/test_mpi.py @@ -0,0 +1,63 @@ +import pytest +from unittest import TestCase +from pyflamegpu import * +import time + + +class model_step(pyflamegpu.HostFunction): + def run(self, FLAMEGPU): + counter = FLAMEGPU.environment.getPropertyInt("counter") + counter+=1 + FLAMEGPU.environment.setPropertyInt("counter", counter) + time.sleep(0.1) # Sleep 100ms + +class throw_exception(pyflamegpu.HostFunction): + def run(self, FLAMEGPU): + counter = FLAMEGPU.environment.getPropertyInt("counter"); + init_counter = FLAMEGPU.environment.getPropertyInt("init_counter"); + if FLAMEGPU.getStepCounter() == 1 and counter == 8: + raise Exception("Exception thrown by host fn throw_exception()"); + + +class TestMPIEnsemble(TestCase): + + def test_mpi(self): + # init model + model = pyflamegpu.ModelDescription("MPITest") + model.Environment().newPropertyInt("counter", -1) + model.Environment().newPropertyInt("init_counter", -1) + model.newAgent("agent") + model.newLayer().addHostFunction(model_step()) + model.newLayer().addHostFunction(throw_exception()) + # init plans + RUNS_PER_RANK = 10 + world_size = 2 # cant probe mpi easily, so just default to 2 + plans = pyflamegpu.RunPlanVector(model, RUNS_PER_RANK * world_size); + plans.setSteps(10) + plans.setPropertyLerpRangeInt("counter", 0, (RUNS_PER_RANK * world_size) - 1) + plans.setPropertyLerpRangeInt("init_counter", 0, (RUNS_PER_RANK * world_size) - 1) + # init exit logging config + exit_log_cfg = pyflamegpu.LoggingConfig(model) + exit_log_cfg.logEnvironment("counter") + # init ensemble + ensemble = pyflamegpu.CUDAEnsemble(model) + ensemble.Config().concurrent_runs = 1 + ensemble.Config().error_level = pyflamegpu.CUDAEnsembleConfig.Off + ## Can't do anything fancy like splitting GPUs + ensemble.setExitLog(exit_log_cfg) + # Run the ensemble + err_count = ensemble.simulate(plans) + ## Check that 1 error occurred at rank 0 + ## err_count == 1 + # Validate logs + ## @note Best we can currently do is check logs of each runner have correct results + ## @note Ideally we'd validate between nodes to ensure all runs have been completed + logs = ensemble.getLogs() + for index, log in logs.items(): + exit_log = log.getExitLog() + # Get a logged environment property + counter = exit_log.getEnvironmentPropertyInt("counter") + assert counter == index + 10 + + # cleanup to trigger MPI finalize + pyflamegpu.cleanup() diff --git a/tests/test_cases/simulation/test_mpi_ensemble.cu b/tests/test_cases/simulation/test_mpi_ensemble.cu index f325cfc06..959730632 100644 --- a/tests/test_cases/simulation/test_mpi_ensemble.cu +++ b/tests/test_cases/simulation/test_mpi_ensemble.cu @@ -120,7 +120,7 @@ class TestMPIEnsemble : public testing::Test { void initEnsemble() { initPlans(); initExitLoggingConfig(); - ensemble = new flamegpu::CUDAEnsemble (*model); + ensemble = new flamegpu::CUDAEnsemble(*model); ensemble->Config().concurrent_runs = 1; if (group_size == world_size) { // Single node MPI run