Skip to content

Commit

Permalink
draft: move MPI_Finalize() to util::cleanup()
Browse files Browse the repository at this point in the history
Associated test changes.

Need to test on waimu
  • Loading branch information
Robadob committed Jul 27, 2023
1 parent 1dcfaff commit 43a3b34
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 96 deletions.
2 changes: 1 addition & 1 deletion include/flamegpu/util/cleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace flamegpu {
namespace util {

/**
* Method to cleanup / finalise use of FLAMEGPU.
* Method to cleanup / finalise use of FLAMEGPU (and MPI).
* For CUDA implementations, this resets all CUDA devices in the current system, to ensure that CUDA tools such as cuda-memcheck, compute-sanitizer and Nsight Compute.
*
* This method should ideally be called as the final method prior to an `exit` or the `return` of the main method, as it is costly and can invalidate device memory allocations, potentially breaking application state.
Expand Down
3 changes: 2 additions & 1 deletion src/flamegpu/simulation/CUDAEnsemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {

#ifdef FLAMEGPU_ENABLE_MPI
if (config.mpi) {
MPI_Finalize();
// Ensure all workers have finished before exit
MPI_Barrier(MPI_COMM_WORLD);
}
#endif
// Record and store the elapsed time
Expand Down
14 changes: 14 additions & 0 deletions src/flamegpu/util/cleanup.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#include "flamegpu/util/cleanup.h"

#ifdef FLAMEGPU_ENABLE_MPI
#include <mpi.h>
#endif

#include <cuda_runtime.h>
#include "flamegpu/simulation/detail/CUDAErrorChecking.cuh"
#include "flamegpu/detail/JitifyCache.h"
Expand All @@ -8,6 +12,16 @@ namespace flamegpu {
namespace util {

void cleanup() {
#ifdef FLAMEGPU_ENABLE_MPI
int init_flag = 0;
int fin_flag = 0;
// MPI can only be init and finalized once
MPI_Initialized(&init_flag);
MPI_Finalized(&fin_flag);
if (init_flag && !fin_flag) {
MPI_Finalize();
}
#endif
int originalDevice = 0;
gpuErrchk(cudaGetDevice(&originalDevice));
// Reset all cuda devices for memcheck / profiling purposes.
Expand Down
16 changes: 5 additions & 11 deletions tests/helpers/main.cu
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
#include "helpers/device_initialisation.h"
#include "flamegpu/io/Telemetry.h"
#include "flamegpu/detail/TestSuiteTelemetry.h"
#include "flamegpu/util/cleanup.h"


GTEST_API_ int main(int argc, char **argv) {
// Get the current status of telemetry, to control if test suite results shold be submit or not
// Get the current status of telemetry, to control if test suite results should be submit or not
const bool telemetryEnabled = flamegpu::io::Telemetry::isEnabled();
// Disable telemetry for simulation / ensemble objects in the test suite.
flamegpu::io::Telemetry::disable();
Expand All @@ -22,16 +23,9 @@ GTEST_API_ int main(int argc, char **argv) {
printf("Running main() from %s\n", __FILE__);
testing::InitGoogleTest(&argc, argv);
auto rtn = RUN_ALL_TESTS();
// Reset all cuda devices for memcheck / profiling purposes.
int devices = 0;
gpuErrchk(cudaGetDeviceCount(&devices));
if (devices > 0) {
for (int device = 0; device < devices; ++device) {
gpuErrchk(cudaSetDevice(device));
gpuErrchk(cudaDeviceReset());
}
}
// If there were more than 1 tests selected, (to exlcude bad filters and FLAMEGPU_USE_GTEST_DISCOVER related spam)
// Reset all cuda devices for memcheck / profiling purposes (and finalize MPI)
flamegpu::util::cleanup();
// If there were more than 1 tests selected, (to exclude bad filters and FLAMEGPU_USE_GTEST_DISCOVER related spam)
if (telemetryEnabled && ::testing::UnitTest::GetInstance()->test_to_run_count() > 1) {
// Detect if -v / --verbose was passed to the test suite binary to log the payload command
bool verbose = false;
Expand Down
217 changes: 135 additions & 82 deletions tests/test_cases/simulation/test_mpi_ensemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ FLAMEGPU_STEP_FUNCTION(model_step) {
FLAMEGPU->environment.setProperty<int>("counter", counter);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
FLAMEGPU_STEP_FUNCTION(throw_exception) {
const int counter = FLAMEGPU->environment.getProperty<int>("counter");
if (FLAMEGPU->getStepCounter() == 1 && counter == 12) {
throw flamegpu::exception::VersionMismatch("Counter - %d", counter);
}
}
class TestMPIEnsemble : public testing::Test {
protected:
void SetUp() override {
Expand Down Expand Up @@ -56,24 +62,22 @@ class TestMPIEnsemble : public testing::Test {
MPI_Comm group;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &group);
MPI_Comm_size(group, &group_size);
if (group_size != world_size) {
// Multi node MPI run
GTEST_SKIP() << "Skipping single-node MPI test, world size exceeds shared memory split size.";
return;
if (world_size < 1) {
GTEST_SKIP() << "world_size<1, something went wrong.";
}
initModel();
initPlans();
initExitLoggingConfig();
}
void TearDown() override {
if (ensemble) delete ensemble;
if (exit_log_cfg) delete exit_log_cfg;
if (plans) delete plans;
if (model) delete model;
}
void initModel() {
model = new flamegpu::ModelDescription("MPITest");
model->Environment().newProperty<int>("counter", -1);
model->Environment().newProperty<int>("counter_init", -1);
model->newAgent("agent");
model->newLayer().addHostFunction(model_step);
}
Expand All @@ -84,104 +88,153 @@ class TestMPIEnsemble : public testing::Test {
plans = new flamegpu::RunPlanVector(*model, 100);
plans->setSteps(10);
plans->setPropertyLerpRange<int>("counter", 0, 99);
plans->setPropertyLerpRange<int>("counter_init", 0, 99);
}
void initExitLoggingConfig() {
exit_log_cfg = new flamegpu::LoggingConfig(*model);
exit_log_cfg->logEnvironment("counter");
exit_log_cfg->logEnvironment("counter_init");
}
void initEnsemble() {
ensemble = new flamegpu::CUDAEnsemble (*model);
ensemble->Config().concurrent_runs = 1;
if (group_size == world_size) {
// Single node MPI run
int deviceCount = 0;
gpuErrchk(cudaGetDeviceCount(&deviceCount));
if (deviceCount < world_size) {
MPI_Finalize();
GTEST_SKIP() << "Skipping single-node MPI test, world size (" << world_size << ") exceeds GPU count (" << deviceCount << "), this would cause test to stall.";
}
ensemble->Config().concurrent_runs = 1;
if (world_size > 1) {
ensemble->Config().devices = {world_rank};
}
}
ensemble->setExitLog(*exit_log_cfg);
}
void validateLogs() {
// Validate results
// @note Best we can currently do is check logs of each runner have correct results
// @note Ideally we'd validate between nodes to ensure all runs have been completed
const std::map<unsigned int, RunLog> logs = ensemble.getLogs();
for (const auto &[index, log] : logs) {
const ExitLogFrame& exit_log = log.getExitLog();
// EXPECT_EQ(exit_log.getStepCount(), 10);
if (exit_log.getStepCount()) { // Temp, currently every runner gets all logs but unhandled ones are empty
// Get a logged environment property
const int counter = exit_log.getEnvironmentProperty<int>("counter");
EXPECT_EQ(counter, index + 10);
}
}
}
int world_rank = -1;
int group_size = -1;
int world_size = -1;
flamegpu::ModelDescription *model = nullptr;
flamegpu::RunPlanVector *plans = nullptr;
flamegpu::LoggingConfig *exit_log_cfg = nullptr;
flamegpu::CUDAEnsemble *ensemble = nullptr;
};
TEST_F(TestMPIEnsemble, local) {
if (world_size < 1) return;
if (group_size != world_size) {
// Multi node MPI run
GTEST_SKIP() << "Skipping single-node MPI test, world size exceeds shared memory split size.";
return;
}
// Single node MPI run
int deviceCount = 0;
gpuErrchk(cudaGetDeviceCount(&deviceCount));
if (deviceCount < world_size) {
MPI_Finalize();
GTEST_SKIP() << "Skipping single-node MPI test, world size (" << world_size << ") exceeds GPU count (" << deviceCount << "), this would cause test to stall.";
TEST_F(TestMPIEnsemble, success) {
initEnsemble();
const unsigned int successful_runs = ensemble->simulate(*plans);
EXPECT_EQ(successful_runs, 100u);
// Get stderr and stdout
std::string output = testing::internal::GetCapturedStdout();
std::string errors = testing::internal::GetCapturedStderr();
// Expect no warnings (stderr) but outputs on progress and timing
if (world_rank == 0) {
EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2
EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully!
EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s
EXPECT_TRUE(errors.empty());
} else {
EXPECT_TRUE(output.empty());
EXPECT_TRUE(errors.empty());
}
flamegpu::CUDAEnsemble ensemble(*model);
ensemble.Config().concurrent_runs = 1;
if (world_size > 1) {
ensemble.Config().devices = {world_rank};
}
ensemble.setExitLog(*exit_log_cfg);

ensemble.simulate(*plans);

// Validate results
// @note Best we can currently do is check logs of each runner have correct results
// @note Ideally we'd validate between nodes to ensure all runs have been completed
const std::map<unsigned int, RunLog> logs = ensemble.getLogs();
for (const auto &[_, log] : logs) {
const ExitLogFrame& exit_log = log.getExitLog();
EXPECT_EQ(exit_log.getStepCount(), 10);
// Get a logged environment property
const int counter = exit_log.getEnvironmentProperty<int>("counter");
const int counter_init = exit_log.getEnvironmentProperty<int>("counter_init"); // @todo Can't access run index via RunLog
EXPECT_EQ(counter, counter_init + 10);
}
validateLogs();
}
TEST_F(TestMPIEnsemble, multi) {
if (world_size < 1) return;
if (group_size == world_size) {
// Single node MPI run
GTEST_SKIP() << "Skipping multi-node MPI test, world size equals shared memory split size.";
return;
}
// Multi node MPI run
if (world_size < 1) return;
if (group_size == world_size) {
// Single node MPI run
GTEST_SKIP() << "Skipping multi-node MPI test, world size equals shared memory split size.";
return;
}
// Multi node MPI run
int deviceCount = 0;
gpuErrchk(cudaGetDeviceCount(&deviceCount));
if (deviceCount < group_size) {
MPI_Finalize();
GTEST_SKIP() << "Skipping multi-node MPI test on this node, group size (" << group_size << ") exceeds GPU count (" << deviceCount << "), this would cause test to stall.";
}
flamegpu::CUDAEnsemble ensemble(*model);
ensemble.Config().concurrent_runs = 1;
if (group_size > 1) {
ensemble.Config().devices = {world_rank};
TEST_F(TestMPIEnsemble, error_off) {
model->newLayer().addHostFunction(throw_exception);
initEnsemble();
ensemble->Config().error_level = CUDAEnsemble::EnsembleConfig::Off;
const unsigned int successful_runs = 0;
EXPECT_NO_THROW(successful_runs = ensemble->simulate(*plans));
// With error off, we would expect to see run index 12 fail
// Therefore 99 returned instead of 100
EXPECT_EQ(successful_runs, 99u);
// Get stderr and stdout
std::string output = testing::internal::GetCapturedStdout();
std::string errors = testing::internal::GetCapturedStderr();
// Expect no warnings (stderr) but outputs on progress and timing
// Get stderr and stdout
std::string output = testing::internal::GetCapturedStdout();
std::string errors = testing::internal::GetCapturedStderr();
// Expect no warnings (stderr) but outputs on progress and timing
if (world_rank == 0) {
EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2
EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully!
EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s
EXPECT_TRUE(errors.empty());
} else {
EXPECT_TRUE(output.empty());
EXPECT_TRUE(errors.empty());
}
ensemble.setExitLog(*exit_log_cfg);

ensemble.simulate(*plans);
// Existing logs should still validate
validateLogs();
}
TEST_F(TestMPIEnsemble, error_slow) {
model->newLayer().addHostFunction(throw_exception);
initEnsemble();
ensemble->Config().error_level = CUDAEnsemble::EnsembleConfig::Slow;
EXPECT_THROW(ensemble->simulate(*plans), flamegpu::exception::EnsembleError);
// @todo can't capture total number of successful/failed runs
// Get stderr and stdout
std::string output = testing::internal::GetCapturedStdout();
std::string errors = testing::internal::GetCapturedStderr();
// Expect no warnings (stderr) but outputs on progress and timing
if (world_rank == 0) {
EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2
EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully!
EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s
EXPECT_TRUE(errors.empty());
} else {
EXPECT_TRUE(output.empty());
EXPECT_TRUE(errors.empty());
}

// Validate results
// @note Best we can currently do is check logs of each runner have correct results
// @note Ideally we'd validate between nodes to ensure all runs have been completed
const std::map<unsigned int, RunLog> logs = ensemble.getLogs();
for (const auto &[_, log] : logs) {
const ExitLogFrame& exit_log = log.getExitLog();
// EXPECT_EQ(exit_log.getStepCount(), 10);
if (exit_log.getStepCount()) { // Temp, currently every runner gets all logs but unhandled ones are empty
// Get a logged environment property
const int counter = exit_log.getEnvironmentProperty<int>("counter");
const int counter_init = exit_log.getEnvironmentProperty<int>("counter_init"); // @todo Can't access run index via RunLog
EXPECT_EQ(counter, counter_init + 10);
}
// Existing logs should still validate
validateLogs();
}
TEST_F(TestMPIEnsemble, error_fast) {
model->newLayer().addHostFunction(throw_exception);
initEnsemble();
ensemble->Config().error_level = CUDAEnsemble::EnsembleConfig::Fast;
EXPECT_THROW(ensemble->simulate(*plans), flamegpu::exception::EnsembleError);
// @todo can't capture total number of successful/failed runs
// Get stderr and stdout
std::string output = testing::internal::GetCapturedStdout();
std::string errors = testing::internal::GetCapturedStderr();
// Expect no warnings (stderr) but outputs on progress and timing
if (world_rank == 0) {
EXPECT_TRUE(output.find("CUDAEnsemble progress") != std::string::npos); // E.g. CUDAEnsemble progress: 1/2
EXPECT_TRUE(output.find("CUDAEnsemble completed") != std::string::npos); // E.g. CUDAEnsemble completed 2 runs successfully!
EXPECT_TRUE(output.find("Ensemble time elapsed") != std::string::npos); // E.g. Ensemble time elapsed: 0.006000s
EXPECT_TRUE(errors.empty());
} else {
EXPECT_TRUE(output.empty());
EXPECT_TRUE(errors.empty());
}

// Existing logs should still validate
validateLogs();
}
#else
TEST(TestMPIEnsemble, DISABLED_local_test) { }
TEST(TestMPIEnsemble, DISABLED_multi_test) { }
TEST(TestMPIEnsemble, DISABLED_success) { }
TEST(TestMPIEnsemble, DISABLED_error_off) { }
TEST(TestMPIEnsemble, DISABLED_error_slow) { }
TEST(TestMPIEnsemble, DISABLED_error_fast) { }
#endif

} // namespace test_mpi_ensemble
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cases/util/test_cleanup.cu
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ FLAMEGPU_AGENT_FUNCTION(alive, MessageNone, MessageNone) {

// Test the getting of a device's compute capability.
TEST(TestCleanup, Explicit) {
// Allocate some arbitraty device memory.
// Allocate some arbitrary device memory.
int * d_int = nullptr;
gpuErrchk(cudaMalloc(&d_int, sizeof(int)));
// Validate that the ptr is a valid device pointer
Expand Down

0 comments on commit 43a3b34

Please sign in to comment.