Skip to content

Commit

Permalink
Fix MPIEnsemble init order
Browse files Browse the repository at this point in the history
A single simple Python test case (it calls no MPI commands externally to make it more precise)
  • Loading branch information
Robadob committed Nov 7, 2023
1 parent 0431701 commit e01a2c5
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 5 deletions.
5 changes: 4 additions & 1 deletion include/flamegpu/simulation/detail/MPIEnsemble.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace flamegpu {
namespace detail {

class MPIEnsemble {
const MPI_Datatype MPI_ERROR_DETAIL;
const CUDAEnsemble::EnsembleConfig &config;
// Tags to different the MPI messages used in protocol
enum EnvelopeTag : int {
Expand Down Expand Up @@ -98,6 +97,10 @@ class MPIEnsemble {
* Iterate config.devices to find the item at index j
*/
unsigned int getDeviceIndex(const int j);
/**
* MPI representation of AbstractSimRunner::ErrorDetail type
*/
const MPI_Datatype MPI_ERROR_DETAIL;
};
} // namespace detail
} // namespace flamegpu
Expand Down
7 changes: 4 additions & 3 deletions src/flamegpu/simulation/detail/MPIEnsemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ namespace flamegpu {
namespace detail {

MPIEnsemble::MPIEnsemble(const CUDAEnsemble::EnsembleConfig &_config, const unsigned int _total_runs)
: MPI_ERROR_DETAIL(AbstractSimRunner::createErrorDetailMPIDatatype())
, config(_config)
: config(_config)
, world_rank(getWorldRank())
, world_size(getWorldSize())
, total_runs(_total_runs) { }
, total_runs(_total_runs)
, MPI_ERROR_DETAIL(AbstractSimRunner::createErrorDetailMPIDatatype()) { }

int MPIEnsemble::receiveErrors(std::multimap<int, AbstractSimRunner::ErrorDetail> &err_detail) {
int errCount = 0;
Expand Down Expand Up @@ -196,6 +196,7 @@ int MPIEnsemble::getWorldSize() {
return world_size;
}
void MPIEnsemble::initMPI() {
printf("MPI IS BEING INIT\n");
int flag = 0;
// MPI can only be init once, for certain test cases we do some initial MPI comms for setup
MPI_Initialized(&flag);
Expand Down
63 changes: 63 additions & 0 deletions tests/python/test_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pytest
from unittest import TestCase
from pyflamegpu import *
import time


class model_step(pyflamegpu.HostFunction):
def run(self, FLAMEGPU):
counter = FLAMEGPU.environment.getPropertyInt("counter")
counter+=1
FLAMEGPU.environment.setPropertyInt("counter", counter)
time.sleep(0.1) # Sleep 100ms

class throw_exception(pyflamegpu.HostFunction):
def run(self, FLAMEGPU):
counter = FLAMEGPU.environment.getPropertyInt("counter");
init_counter = FLAMEGPU.environment.getPropertyInt("init_counter");
if FLAMEGPU.getStepCounter() == 1 and counter == 8:
raise Exception("Exception thrown by host fn throw_exception()");


class TestMPIEnsemble(TestCase):

def test_mpi(self):
# init model
model = pyflamegpu.ModelDescription("MPITest")
model.Environment().newPropertyInt("counter", -1)
model.Environment().newPropertyInt("init_counter", -1)
model.newAgent("agent")
model.newLayer().addHostFunction(model_step())
model.newLayer().addHostFunction(throw_exception())
# init plans
RUNS_PER_RANK = 10
world_size = 2 # cant probe mpi easily, so just default to 2
plans = pyflamegpu.RunPlanVector(model, RUNS_PER_RANK * world_size);
plans.setSteps(10)
plans.setPropertyLerpRangeInt("counter", 0, (RUNS_PER_RANK * world_size) - 1)
plans.setPropertyLerpRangeInt("init_counter", 0, (RUNS_PER_RANK * world_size) - 1)
# init exit logging config
exit_log_cfg = pyflamegpu.LoggingConfig(model)
exit_log_cfg.logEnvironment("counter")
# init ensemble
ensemble = pyflamegpu.CUDAEnsemble(model)
ensemble.Config().concurrent_runs = 1
ensemble.Config().error_level = pyflamegpu.CUDAEnsembleConfig.Off
## Can't do anything fancy like splitting GPUs
ensemble.setExitLog(exit_log_cfg)
# Run the ensemble
err_count = ensemble.simulate(plans)
## Check that 1 error occurred at rank 0
## err_count == 1
# Validate logs
## @note Best we can currently do is check logs of each runner have correct results
## @note Ideally we'd validate between nodes to ensure all runs have been completed
logs = ensemble.getLogs()
for index, log in logs.items():
exit_log = log.getExitLog()
# Get a logged environment property
counter = exit_log.getEnvironmentPropertyInt("counter")
assert counter == index + 10

# cleanup to trigger MPI finalize
pyflamegpu.cleanup()
2 changes: 1 addition & 1 deletion tests/test_cases/simulation/test_mpi_ensemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TestMPIEnsemble : public testing::Test {
void initEnsemble() {
initPlans();
initExitLoggingConfig();
ensemble = new flamegpu::CUDAEnsemble (*model);
ensemble = new flamegpu::CUDAEnsemble(*model);
ensemble->Config().concurrent_runs = 1;
if (group_size == world_size) {
// Single node MPI run
Expand Down

0 comments on commit e01a2c5

Please sign in to comment.