From cc9e9fc77afd87df3f4e4b34cfc5ece245650d1c Mon Sep 17 00:00:00 2001 From: Robert Chisholm Date: Fri, 28 Jul 2023 14:09:39 +0100 Subject: [PATCH] MPI ensemble telemetry --- include/flamegpu/simulation/CUDAEnsemble.h | 2 + src/flamegpu/simulation/CUDAEnsemble.cu | 59 +++++++++++++++++++++- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/include/flamegpu/simulation/CUDAEnsemble.h b/include/flamegpu/simulation/CUDAEnsemble.h index c45b1b961..4703051bf 100644 --- a/include/flamegpu/simulation/CUDAEnsemble.h +++ b/include/flamegpu/simulation/CUDAEnsemble.h @@ -33,6 +33,8 @@ class CUDAEnsemble { // Sent from worker to manager to report an error during job execution // If fail fast is enabled, following RequestJob will receive an exit job id (>=plans.size()) ReportError = 2, + // Sent from worker to manager to report GPUs for telemetry + TelemetryDevices = 3, }; #endif /** diff --git a/src/flamegpu/simulation/CUDAEnsemble.cu b/src/flamegpu/simulation/CUDAEnsemble.cu index 0ded6e2a8..b987896f5 100644 --- a/src/flamegpu/simulation/CUDAEnsemble.cu +++ b/src/flamegpu/simulation/CUDAEnsemble.cu @@ -66,7 +66,6 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) { #ifdef FLAMEGPU_ENABLE_MPI int world_rank = -1; int world_size = -1; - int finalize_size = -1; if (config.mpi) { int flag = 0; // MPI can only be init once, for certain test cases we do some initial MPI comms for setup @@ -493,9 +492,57 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) { } #ifdef FLAMEGPU_ENABLE_MPI + std::string remote_device_names; if (config.mpi) { // Ensure all workers have finished before exit MPI_Barrier(MPI_COMM_WORLD); + if (config.telemetry) { + // All ranks should notify rank 0 of their GPU devices + if (world_rank == 0) { + int bufflen = 2048; + char* buff = malloc(bufflen); + for (int i = 1; i < world_size; ++i) { + // Receive a message from each rank + // Check whether MPI runners have reported an error + MPI_Status status; + memset(&status, 0, sizeof(MPI_Status)); + MPI_Iprobe( + MPI_ANY_SOURCE, // int source + EnvelopeTag::TelemetryDevices, // int tag + MPI_COMM_WORLD, // MPI_Comm communicator + &status); + int strlen = 0; + MPI_Get_count(&status, MPI_CHAR, &strlen); + if (strlen > bufflen) { + free(buff); + bufflen = 2 * strlen; + buff = malloc(bufflen); + } + MPI_Recv( + buff, // void* data + strlen, // int count + MPI_CHAR, // MPI_Datatype datatype (can't use MPI_DATATYPE_NULL) + MPI_ANY_SOURCE, // int source + EnvelopeTag::TelemetryDevices, // int tag + MPI_COMM_WORLD, // MPI_Comm communicator + &status); // MPI_Status* + remote_device_names.append(", "); + remote_device_names.append(buff); + } + free(buff); + } else { + const std::string d_string = flamegpu::detail::compute_capability::getDeviceNames(config.devices); + // Send GPU count + MPI_Send( + d_string.c_str(), // void* data + d_string.length() + 1, // int count + MPI_CHAR, // MPI_Datatype datatype + 0, // int destination + EnvelopeTag::TelemetryDevices, // int tag + MPI_COMM_WORLD); // MPI_Comm communicator + } + } + MPI_Barrier(MPI_COMM_WORLD); } #endif // Record and store the elapsed time @@ -520,11 +567,19 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) { if (config.telemetry && (!config.mpi || world_rank == 0)) { // Generate some payload items std::map payload_items; - payload_items["GPUDevices"] = flamegpu::detail::compute_capability::getDeviceNames(config.devices); +#ifndef FLAMEGPU_ENABLE_MPI + payload_items["GPUDevices"] = flamegpu::detail::compute_capability::getDeviceNames(config.devices); +#else + payload_items["GPUDevices"] = flamegpu::detail::compute_capability::getDeviceNames(config.devices) + remote_device_names; +#endif payload_items["SimTime(s)"] = std::to_string(ensemble_elapsed_time); #if defined(__CUDACC_VER_MAJOR__) && defined(__CUDACC_VER_MINOR__) && defined(__CUDACC_VER_PATCH__) payload_items["NVCCVersion"] = std::to_string(__CUDACC_VER_MAJOR__) + "." + std::to_string(__CUDACC_VER_MINOR__) + "." + std::to_string(__CUDACC_VER_BUILD__); #endif + payload_items["mpi"] = config.mpi ? "0" : "1"; +#ifdef FLAMEGPU_ENABLE_MPI + payload_items["mpi_world_size"] = std::to_string(world_size); +#endif // generate telemetry data std::string telemetry_data = flamegpu::io::Telemetry::generateData("ensemble-run", payload_items, isSWIG); // send the telemetry packet