From 43a3b34f783232642bd37dbcdd901422e00b9fac Mon Sep 17 00:00:00 2001 From: Robert Chisholm Date: Thu, 27 Jul 2023 15:37:19 +0100 Subject: [PATCH] draft: move MPI_Finalize() to util::cleanup() Associated test changes. Need to test on waimu --- include/flamegpu/util/cleanup.h | 2 +- src/flamegpu/simulation/CUDAEnsemble.cu | 3 +- src/flamegpu/util/cleanup.cu | 14 ++ tests/helpers/main.cu | 16 +- .../simulation/test_mpi_ensemble.cu | 217 +++++++++++------- tests/test_cases/util/test_cleanup.cu | 2 +- 6 files changed, 158 insertions(+), 96 deletions(-) diff --git a/include/flamegpu/util/cleanup.h b/include/flamegpu/util/cleanup.h index f25b18f33..5dfa0326f 100644 --- a/include/flamegpu/util/cleanup.h +++ b/include/flamegpu/util/cleanup.h @@ -9,7 +9,7 @@ namespace flamegpu { namespace util { /** - * Method to cleanup / finalise use of FLAMEGPU. + * Method to cleanup / finalise use of FLAMEGPU (and MPI). * For CUDA implementations, this resets all CUDA devices in the current system, to ensure that CUDA tools such as cuda-memcheck, compute-sanitizer and Nsight Compute. * * This method should ideally be called as the final method prior to an `exit` or the `return` of the main method, as it is costly and can invalidate device memory allocations, potentially breaking application state. diff --git a/src/flamegpu/simulation/CUDAEnsemble.cu b/src/flamegpu/simulation/CUDAEnsemble.cu index 173469323..ef26da97b 100644 --- a/src/flamegpu/simulation/CUDAEnsemble.cu +++ b/src/flamegpu/simulation/CUDAEnsemble.cu @@ -490,7 +490,8 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) { #ifdef FLAMEGPU_ENABLE_MPI if (config.mpi) { - MPI_Finalize(); + // Ensure all workers have finished before exit + MPI_Barrier(MPI_COMM_WORLD); } #endif // Record and store the elapsed time diff --git a/src/flamegpu/util/cleanup.cu b/src/flamegpu/util/cleanup.cu index 6b680692e..b66e827bb 100644 --- a/src/flamegpu/util/cleanup.cu +++ b/src/flamegpu/util/cleanup.cu @@ -1,5 +1,9 @@ #include "flamegpu/util/cleanup.h" +#ifdef FLAMEGPU_ENABLE_MPI +#include +#endif + #include #include "flamegpu/simulation/detail/CUDAErrorChecking.cuh" #include "flamegpu/detail/JitifyCache.h" @@ -8,6 +12,16 @@ namespace flamegpu { namespace util { void cleanup() { +#ifdef FLAMEGPU_ENABLE_MPI + int init_flag = 0; + int fin_flag = 0; + // MPI can only be init and finalized once + MPI_Initialized(&init_flag); + MPI_Finalized(&fin_flag); + if (init_flag && !fin_flag) { + MPI_Finalize(); + } +#endif int originalDevice = 0; gpuErrchk(cudaGetDevice(&originalDevice)); // Reset all cuda devices for memcheck / profiling purposes. diff --git a/tests/helpers/main.cu b/tests/helpers/main.cu index 2a3e3c174..4f57d3f39 100644 --- a/tests/helpers/main.cu +++ b/tests/helpers/main.cu @@ -7,10 +7,11 @@ #include "helpers/device_initialisation.h" #include "flamegpu/io/Telemetry.h" #include "flamegpu/detail/TestSuiteTelemetry.h" +#include "flamegpu/util/cleanup.h" GTEST_API_ int main(int argc, char **argv) { - // Get the current status of telemetry, to control if test suite results shold be submit or not + // Get the current status of telemetry, to control if test suite results should be submit or not const bool telemetryEnabled = flamegpu::io::Telemetry::isEnabled(); // Disable telemetry for simulation / ensemble objects in the test suite. flamegpu::io::Telemetry::disable(); @@ -22,16 +23,9 @@ GTEST_API_ int main(int argc, char **argv) { printf("Running main() from %s\n", __FILE__); testing::InitGoogleTest(&argc, argv); auto rtn = RUN_ALL_TESTS(); - // Reset all cuda devices for memcheck / profiling purposes. - int devices = 0; - gpuErrchk(cudaGetDeviceCount(&devices)); - if (devices > 0) { - for (int device = 0; device < devices; ++device) { - gpuErrchk(cudaSetDevice(device)); - gpuErrchk(cudaDeviceReset()); - } - } - // If there were more than 1 tests selected, (to exlcude bad filters and FLAMEGPU_USE_GTEST_DISCOVER related spam) + // Reset all cuda devices for memcheck / profiling purposes (and finalize MPI) + flamegpu::util::cleanup(); + // If there were more than 1 tests selected, (to exclude bad filters and FLAMEGPU_USE_GTEST_DISCOVER related spam) if (telemetryEnabled && ::testing::UnitTest::GetInstance()->test_to_run_count() > 1) { // Detect if -v / --verbose was passed to the test suite binary to log the payload command bool verbose = false; diff --git a/tests/test_cases/simulation/test_mpi_ensemble.cu b/tests/test_cases/simulation/test_mpi_ensemble.cu index 47b9b56a4..f4af5dfe5 100644 --- a/tests/test_cases/simulation/test_mpi_ensemble.cu +++ b/tests/test_cases/simulation/test_mpi_ensemble.cu @@ -29,6 +29,12 @@ FLAMEGPU_STEP_FUNCTION(model_step) { FLAMEGPU->environment.setProperty("counter", counter); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } +FLAMEGPU_STEP_FUNCTION(throw_exception) { + const int counter = FLAMEGPU->environment.getProperty("counter"); + if (FLAMEGPU->getStepCounter() == 1 && counter == 12) { + throw flamegpu::exception::VersionMismatch("Counter - %d", counter); + } +} class TestMPIEnsemble : public testing::Test { protected: void SetUp() override { @@ -56,16 +62,15 @@ class TestMPIEnsemble : public testing::Test { MPI_Comm group; MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &group); MPI_Comm_size(group, &group_size); - if (group_size != world_size) { - // Multi node MPI run - GTEST_SKIP() << "Skipping single-node MPI test, world size exceeds shared memory split size."; - return; + if (world_size < 1) { + GTEST_SKIP() << "world_size<1, something went wrong."; } initModel(); initPlans(); initExitLoggingConfig(); } void TearDown() override { + if (ensemble) delete ensemble; if (exit_log_cfg) delete exit_log_cfg; if (plans) delete plans; if (model) delete model; @@ -73,7 +78,6 @@ class TestMPIEnsemble : public testing::Test { void initModel() { model = new flamegpu::ModelDescription("MPITest"); model->Environment().newProperty("counter", -1); - model->Environment().newProperty("counter_init", -1); model->newAgent("agent"); model->newLayer().addHostFunction(model_step); } @@ -84,12 +88,43 @@ class TestMPIEnsemble : public testing::Test { plans = new flamegpu::RunPlanVector(*model, 100); plans->setSteps(10); plans->setPropertyLerpRange("counter", 0, 99); - plans->setPropertyLerpRange("counter_init", 0, 99); } void initExitLoggingConfig() { exit_log_cfg = new flamegpu::LoggingConfig(*model); exit_log_cfg->logEnvironment("counter"); - exit_log_cfg->logEnvironment("counter_init"); + } + void initEnsemble() { + ensemble = new flamegpu::CUDAEnsemble (*model); + ensemble->Config().concurrent_runs = 1; + if (group_size == world_size) { + // Single node MPI run + int deviceCount = 0; + gpuErrchk(cudaGetDeviceCount(&deviceCount)); + if (deviceCount < world_size) { + MPI_Finalize(); + GTEST_SKIP() << "Skipping single-node MPI test, world size (" << world_size << ") exceeds GPU count (" << deviceCount << "), this would cause test to stall."; + } + ensemble->Config().concurrent_runs = 1; + if (world_size > 1) { + ensemble->Config().devices = {world_rank}; + } + } + ensemble->setExitLog(*exit_log_cfg); + } + void validateLogs() { + // Validate results + // @note Best we can currently do is check logs of each runner have correct results + // @note Ideally we'd validate between nodes to ensure all runs have been completed + const std::map logs = ensemble.getLogs(); + for (const auto &[index, log] : logs) { + const ExitLogFrame& exit_log = log.getExitLog(); + // EXPECT_EQ(exit_log.getStepCount(), 10); + if (exit_log.getStepCount()) { // Temp, currently every runner gets all logs but unhandled ones are empty + // Get a logged environment property + const int counter = exit_log.getEnvironmentProperty("counter"); + EXPECT_EQ(counter, index + 10); + } + } } int world_rank = -1; int group_size = -1; @@ -97,91 +132,109 @@ class TestMPIEnsemble : public testing::Test { flamegpu::ModelDescription *model = nullptr; flamegpu::RunPlanVector *plans = nullptr; flamegpu::LoggingConfig *exit_log_cfg = nullptr; + flamegpu::CUDAEnsemble *ensemble = nullptr; }; -TEST_F(TestMPIEnsemble, local) { - if (world_size < 1) return; - if (group_size != world_size) { - // Multi node MPI run - GTEST_SKIP() << "Skipping single-node MPI test, world size exceeds shared memory split size."; - return; - } - // Single node MPI run - int deviceCount = 0; - gpuErrchk(cudaGetDeviceCount(&deviceCount)); - if (deviceCount < world_size) { - MPI_Finalize(); - GTEST_SKIP() << "Skipping single-node MPI test, world size (" << world_size << ") exceeds GPU count (" << deviceCount << "), this would cause test to stall."; +TEST_F(TestMPIEnsemble, success) { + initEnsemble(); + const unsigned int successful_runs = ensemble->simulate(*plans); + EXPECT_EQ(successful_runs, 100u); + // Get stderr and stdout + std::string output = testing::internal::GetCapturedStdout(); + std::string errors = testing::internal::GetCapturedStderr(); + // Expect no warnings (stderr) but outputs on progress and timing + if (world_rank == 0) { + EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2 + EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully! + EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s + EXPECT_TRUE(errors.empty()); + } else { + EXPECT_TRUE(output.empty()); + EXPECT_TRUE(errors.empty()); } - flamegpu::CUDAEnsemble ensemble(*model); - ensemble.Config().concurrent_runs = 1; - if (world_size > 1) { - ensemble.Config().devices = {world_rank}; - } - ensemble.setExitLog(*exit_log_cfg); - - ensemble.simulate(*plans); - // Validate results - // @note Best we can currently do is check logs of each runner have correct results - // @note Ideally we'd validate between nodes to ensure all runs have been completed - const std::map logs = ensemble.getLogs(); - for (const auto &[_, log] : logs) { - const ExitLogFrame& exit_log = log.getExitLog(); - EXPECT_EQ(exit_log.getStepCount(), 10); - // Get a logged environment property - const int counter = exit_log.getEnvironmentProperty("counter"); - const int counter_init = exit_log.getEnvironmentProperty("counter_init"); // @todo Can't access run index via RunLog - EXPECT_EQ(counter, counter_init + 10); - } + validateLogs(); } -TEST_F(TestMPIEnsemble, multi) { - if (world_size < 1) return; - if (group_size == world_size) { - // Single node MPI run - GTEST_SKIP() << "Skipping multi-node MPI test, world size equals shared memory split size."; - return; - } - // Multi node MPI run - if (world_size < 1) return; - if (group_size == world_size) { - // Single node MPI run - GTEST_SKIP() << "Skipping multi-node MPI test, world size equals shared memory split size."; - return; - } - // Multi node MPI run - int deviceCount = 0; - gpuErrchk(cudaGetDeviceCount(&deviceCount)); - if (deviceCount < group_size) { - MPI_Finalize(); - GTEST_SKIP() << "Skipping multi-node MPI test on this node, group size (" << group_size << ") exceeds GPU count (" << deviceCount << "), this would cause test to stall."; - } - flamegpu::CUDAEnsemble ensemble(*model); - ensemble.Config().concurrent_runs = 1; - if (group_size > 1) { - ensemble.Config().devices = {world_rank}; +TEST_F(TestMPIEnsemble, error_off) { + model->newLayer().addHostFunction(throw_exception); + initEnsemble(); + ensemble->Config().error_level = CUDAEnsemble::EnsembleConfig::Off; + const unsigned int successful_runs = 0; + EXPECT_NO_THROW(successful_runs = ensemble->simulate(*plans)); + // With error off, we would expect to see run index 12 fail + // Therefore 99 returned instead of 100 + EXPECT_EQ(successful_runs, 99u); + // Get stderr and stdout + std::string output = testing::internal::GetCapturedStdout(); + std::string errors = testing::internal::GetCapturedStderr(); + // Expect no warnings (stderr) but outputs on progress and timing + // Get stderr and stdout + std::string output = testing::internal::GetCapturedStdout(); + std::string errors = testing::internal::GetCapturedStderr(); + // Expect no warnings (stderr) but outputs on progress and timing + if (world_rank == 0) { + EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2 + EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully! + EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s + EXPECT_TRUE(errors.empty()); + } else { + EXPECT_TRUE(output.empty()); + EXPECT_TRUE(errors.empty()); } - ensemble.setExitLog(*exit_log_cfg); - ensemble.simulate(*plans); + // Existing logs should still validate + validateLogs(); +} +TEST_F(TestMPIEnsemble, error_slow) { + model->newLayer().addHostFunction(throw_exception); + initEnsemble(); + ensemble->Config().error_level = CUDAEnsemble::EnsembleConfig::Slow; + EXPECT_THROW(ensemble->simulate(*plans), flamegpu::exception::EnsembleError); + // @todo can't capture total number of successful/failed runs + // Get stderr and stdout + std::string output = testing::internal::GetCapturedStdout(); + std::string errors = testing::internal::GetCapturedStderr(); + // Expect no warnings (stderr) but outputs on progress and timing + if (world_rank == 0) { + EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2 + EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully! + EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s + EXPECT_TRUE(errors.empty()); + } else { + EXPECT_TRUE(output.empty()); + EXPECT_TRUE(errors.empty()); + } - // Validate results - // @note Best we can currently do is check logs of each runner have correct results - // @note Ideally we'd validate between nodes to ensure all runs have been completed - const std::map logs = ensemble.getLogs(); - for (const auto &[_, log] : logs) { - const ExitLogFrame& exit_log = log.getExitLog(); - // EXPECT_EQ(exit_log.getStepCount(), 10); - if (exit_log.getStepCount()) { // Temp, currently every runner gets all logs but unhandled ones are empty - // Get a logged environment property - const int counter = exit_log.getEnvironmentProperty("counter"); - const int counter_init = exit_log.getEnvironmentProperty("counter_init"); // @todo Can't access run index via RunLog - EXPECT_EQ(counter, counter_init + 10); - } + // Existing logs should still validate + validateLogs(); +} +TEST_F(TestMPIEnsemble, error_fast) { + model->newLayer().addHostFunction(throw_exception); + initEnsemble(); + ensemble->Config().error_level = CUDAEnsemble::EnsembleConfig::Fast; + EXPECT_THROW(ensemble->simulate(*plans), flamegpu::exception::EnsembleError); + // @todo can't capture total number of successful/failed runs + // Get stderr and stdout + std::string output = testing::internal::GetCapturedStdout(); + std::string errors = testing::internal::GetCapturedStderr(); + // Expect no warnings (stderr) but outputs on progress and timing + if (world_rank == 0) { + EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2 + EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully! + EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s + EXPECT_TRUE(errors.empty()); + } else { + EXPECT_TRUE(output.empty()); + EXPECT_TRUE(errors.empty()); } + + // Existing logs should still validate + validateLogs(); } #else -TEST(TestMPIEnsemble, DISABLED_local_test) { } -TEST(TestMPIEnsemble, DISABLED_multi_test) { } +TEST(TestMPIEnsemble, DISABLED_success) { } +TEST(TestMPIEnsemble, DISABLED_error_off) { } +TEST(TestMPIEnsemble, DISABLED_error_slow) { } +TEST(TestMPIEnsemble, DISABLED_error_fast) { } #endif } // namespace test_mpi_ensemble diff --git a/tests/test_cases/util/test_cleanup.cu b/tests/test_cases/util/test_cleanup.cu index 0bbe42c8c..a9c8580eb 100644 --- a/tests/test_cases/util/test_cleanup.cu +++ b/tests/test_cases/util/test_cleanup.cu @@ -26,7 +26,7 @@ FLAMEGPU_AGENT_FUNCTION(alive, MessageNone, MessageNone) { // Test the getting of a device's compute capability. TEST(TestCleanup, Explicit) { - // Allocate some arbitraty device memory. + // Allocate some arbitrary device memory. int * d_int = nullptr; gpuErrchk(cudaMalloc(&d_int, sizeof(int))); // Validate that the ptr is a valid device pointer