Skip to content

Commit

Permalink
First draft of MPISimRunner
Browse files Browse the repository at this point in the history
Does not currently support proper error handling (error's are caught and handled silently)

Need to test compilation with MPI and write a test.

Need to workout how we will write test to divide up GPUs with local MPI.
  • Loading branch information
Robadob committed Jul 20, 2023
1 parent 3446307 commit 0c876d0
Show file tree
Hide file tree
Showing 18 changed files with 2,587 additions and 218 deletions.
29 changes: 29 additions & 0 deletions include/flamegpu/defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,35 @@ typedef unsigned int id_t;
*/
constexpr const char* ID_VARIABLE_NAME = "_id";
/**
<<<<<<< Updated upstream
=======
* Internal variable name used for source-dest pairs
* @note These are always stored in [dest, source] order, but shown to the user in [source, dest] order. This enables 2D sorting
*/
constexpr const char* GRAPH_SOURCE_DEST_VARIABLE_NAME = "_source_dest";
/**
* Internal variable name used for source-dest vertex ID pairs
* @note These are always stored in [dest, source] order, to match GRAPH_SOURCE_DEST_VARIABLE_NAME
*/
constexpr const char* GRAPH_SOURCE_DEST_ID_VARIABLE_NAME = "_source_dest_id";
/**
* Internal variable name used to the pointer to the PBM of edges leaving each vertex
*/
constexpr const char* GRAPH_VERTEX_PBM_VARIABLE_NAME = "_pbm";
/**
* Internal variable name used to the pointer to the (inverted) PBM of edges joining each vertex
*/
constexpr const char* GRAPH_VERTEX_IPBM_VARIABLE_NAME = "_ipbm";
/**
* Edges are not sorted in order of the IPBM, instead the IPBM points to indices in this list of edge indexes
*/
constexpr const char* GRAPH_VERTEX_IPBM_EDGES_VARIABLE_NAME = "_ipbm_edges";
/**
* Accessing an ID within this buffer will return the index of the vertex
**/
constexpr const char* GRAPH_VERTEX_INDEX_MAP_VARIABLE_NAME = "_index_map";
/**
>>>>>>> Stashed changes
* Internal value used when IDs have not be set
* If this value is changed, things may break
*/
Expand Down

Large diffs are not rendered by default.

552 changes: 552 additions & 0 deletions include/flamegpu/runtime/environment/HostEnvironmentDirectedGraph.cuh

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions include/flamegpu/simulation/CUDAEnsemble.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ struct RunLog;
*/
class CUDAEnsemble {
public:
#ifdef FLAMEGPU_ENABLE_MPI
// Tags to different the MPI messages used in protocol
enum EnvelopeTag : int {
// Sent from worker to manager to request a job index to process
RequestJob = 0,
// Sent from manager to worker to assign a job index to process
AssignJob = 1;
};
#endif
/**
* Execution config for running a CUDAEnsemble
*/
Expand Down Expand Up @@ -87,6 +96,14 @@ class CUDAEnsemble {
#else
const bool block_standby = false;
#endif
/**
* Allows MPI processing to be disabled for builds with MPI support
*/
#ifdef FLAMEGPU_ENABLE_MPI
bool mpi = true;
#else
const bool mpi = false;
#endif

bool telemetry = false;
};
Expand Down
6 changes: 3 additions & 3 deletions include/flamegpu/simulation/CUDASimulation.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

namespace flamegpu {
namespace detail {
class SimRunner;
class AbstractSimRunner;
class CUDAAgent;
class CUDAMessage;
} // namespace detail
Expand All @@ -60,7 +60,7 @@ class CUDASimulation : public Simulation {
/**
* Requires internal access to getCUDAAgent()
*/
friend class detail::SimRunner;
friend class detail::AbstractSimRunner;
friend class CUDAEnsemble;
#ifdef FLAMEGPU_VISUALISATION
/**
Expand Down Expand Up @@ -91,7 +91,7 @@ class CUDASimulation : public Simulation {
* CUDA runner specific config
*/
struct Config {
friend class detail::SimRunner;
friend class detail::AbstractSimRunner;
friend class CUDASimulation;
friend class HostAPI;
/**
Expand Down
4 changes: 2 additions & 2 deletions include/flamegpu/simulation/RunPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace flamegpu {
namespace detail {
class SimRunner;
class AbstractSimRunner;
}
class ModelDescription;
class RunPlanVector;
Expand All @@ -31,7 +31,7 @@ class XMLLogger;
*/
class RunPlan {
friend class RunPlanVector;
friend class detail::SimRunner;
friend class detail::AbstractSimRunner;
friend class CUDASimulation;
friend class io::JSONLogger;
friend class io::XMLLogger;
Expand Down
2 changes: 1 addition & 1 deletion include/flamegpu/simulation/RunPlanVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class EnvironmentDescription;
*/
class RunPlanVector : private std::vector<RunPlan> {
friend class RunPlan;
friend class detail::SimRunner;
friend class detail::AbstractSimRunner;
friend unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans);

public:
Expand Down
166 changes: 166 additions & 0 deletions include/flamegpu/simulation/detail/AbstractSimRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#ifndef INCLUDE_FLAMEGPU_SIMULATION_DETAIL_ABSTRACTSIMRUNNER_H_
#define INCLUDE_FLAMEGPU_SIMULATION_DETAIL_ABSTRACTSIMRUNNER_H_

#include <string>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <vector>
#include <memory>

#include "flamegpu/defines.h"
#include "flamegpu/simulation/LogFrame.h"

namespace flamegpu {
struct ModelData;
class LoggingConfig;
class StepLoggingConfig;
class RunPlanVector;
class CUDAEnsemble;
namespace detail {
/**
* Common interface and implementation shared between SimRunner and MPISimRunner
*/
class AbstractSimRunner {
friend class flamegpu::CUDAEnsemble;

public:
struct ErrorDetail {
unsigned int run_id;
unsigned int device_id;
unsigned int runner_id;
std::string exception_string;
};

/**
* Constructor, creates and initialises the underlying thread
* @param _model A copy of the ModelDescription hierarchy for the RunPlanVector, this is used to create the CUDASimulation instances.
* @param _err_ct Reference to an atomic integer for tracking how many errors have occurred
* @param _next_run Atomic counter for safely selecting the next run plan to execute across multiple threads
* @param _plans The vector of run plans to be executed by the ensemble
* @param _step_log_config The config of which data should be logged each step
* @param _exit_log_config The config of which data should be logged at run exit
* @param _device_id The GPU that all runs should execute on
* @param _runner_id A unique index assigned to the runner
* @param _verbosity Verbosity level (Verbosity::Quiet, Verbosity::Default, Verbosity::Verbose)
* @param run_logs Reference to the vector to store generate run logs
* @param log_export_queue The queue of logs to exported to disk
* @param log_export_queue_mutex This mutex must be locked to access log_export_queue
* @param log_export_queue_cdn The condition is notified every time a log has been added to the queue
* @param fast_err_detail Structure to store error details on fast failure for main thread rethrow
* @param _total_runners Total number of runners executing
* @param _isSWIG Flag denoting whether it's a Python build of FLAMEGPU
*/
AbstractSimRunner(const std::shared_ptr<const ModelData> _model,
std::atomic<unsigned int> &_err_ct,
std::atomic<unsigned int> &_next_run,
const RunPlanVector &_plans,
std::shared_ptr<const StepLoggingConfig> _step_log_config,
std::shared_ptr<const LoggingConfig> _exit_log_config,
int _device_id,
unsigned int _runner_id,
flamegpu::Verbosity _verbosity,
std::vector<RunLog> &run_logs,
std::queue<unsigned int> &log_export_queue,
std::mutex &log_export_queue_mutex,
std::condition_variable &log_export_queue_cdn,
ErrorDetail &fast_err_detail,
unsigned int _total_runners,
bool _isSWIG);
/**
* Virtual class requires polymorphic destructor
*/
virtual ~AbstractSimRunner() {}
/**
* Start executing the SimRunner in it's separate thread
*/
virtual void start() = 0;
/**
* Blocking call which if thread->joinable() triggers thread->join()
*/
void join();

protected:
/**
* Create and execute the simulation for the RunPlan within plans of given index
* @throws Exceptions during sim execution may be raised, these should be caught and handled by the caller
*/
void runSimulation(int plan_id);
/**
* The thread which the SimRunner executes on
*/
std::thread thread;
/**
* Each sim runner takes it's own clone of model description hierarchy, so it can manipulate environment without conflict
*/
const std::shared_ptr<const ModelData> model;
/**
* CUDA Device index of runner
*/
const int device_id;
/**
* Per instance unique runner id
*/
const unsigned int runner_id;
/**
* Total number of runners executing
* This is used to calculate the progress on job completion
*/
const unsigned int total_runners;
/**
* Flag for whether to print progress
*/
const flamegpu::Verbosity verbosity;
// External references
/**
* Reference to an atomic integer for tracking how many errors have occurred
*/
std::atomic<unsigned int> &err_ct;
/**
* Atomic counter for safely selecting the next run plan to execute across multiple threads
* This is used differently by each class of runner
*/
std::atomic<unsigned int> &next_run;
/**
* Reference to the vector of run configurations to be executed
*/
const RunPlanVector &plans;
/**
* Config specifying which data to log per step
*/
const std::shared_ptr<const StepLoggingConfig> step_log_config;
/**
* Config specifying which data to log at run exit
*/
const std::shared_ptr<const LoggingConfig> exit_log_config;
/**
* Reference to the vector to store generated run logs
*/
std::vector<RunLog> &run_logs;
/**
* The queue of logs to exported to disk
*/
std::queue<unsigned int> &log_export_queue;
/**
* This mutex must be locked to access log_export_queue
*/
std::mutex &log_export_queue_mutex;
/**
* The condition is notified every time a log has been added to the queue
*/
std::condition_variable &log_export_queue_cdn;
/**
* If fail_fast is true, on error details will be stored here so an exception can be thrown from the main thread
*/
ErrorDetail& fast_err_detail;
/**
* If true, the model is using SWIG Python interface
**/
const bool isSWIG;
};

} // namespace detail
} // namespace flamegpu

#endif // INCLUDE_FLAMEGPU_SIMULATION_DETAIL_ABSTRACTSIMRUNNER_H_
83 changes: 83 additions & 0 deletions include/flamegpu/simulation/detail/MPISimRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#ifndef INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPISIMRUNNER_H_
#define INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPISIMRUNNER_H_

#include <atomic>
#include <memory>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <thread>
#include <vector>
#include <string>

#include "flamegpu/simulation/detail/AbstractSimRunner.h"
#include "flamegpu/defines.h"
#include "flamegpu/simulation/LogFrame.h"

namespace flamegpu {
struct ModelData;
class LoggingConfig;
class StepLoggingConfig;
class RunPlanVector;
class CUDAEnsemble;
namespace detail {

/**
* A thread class which executes RunPlans on a single GPU, communicating with the main-thread which has jobs allocated via MPI
*
* This class is used by CUDAEnsemble, it creates one SimRunner instance per GPU, each executes in a separate thread.
* There may be multiple instances per GPU, if running small models on large GPUs.
*/
class MPISimRunner : public AbstractSimRunner {
enum Signal : unsigned int {
// MPISimRunner sets this to notify manager that it wants a new job
RequestJob = UINT_MAX,
RunFailed = UINT_MAX-1,
};

public:
/**
* Constructor, creates and initialise a new MPISimRunner
* @param _model A copy of the ModelDescription hierarchy for the RunPlanVector, this is used to create the CUDASimulation instances.
* @param _err_ct Reference to an atomic integer for tracking how many errors have occurred
* @param _next_run Atomic counter for safely selecting the next run plan to execute across multiple threads
* @param _plans The vector of run plans to be executed by the ensemble
* @param _step_log_config The config of which data should be logged each step
* @param _exit_log_config The config of which data should be logged at run exit
* @param _device_id The GPU that all runs should execute on
* @param _runner_id A unique index assigned to the runner
* @param _verbosity Verbosity level (Verbosity::Quiet, Verbosity::Default, Verbosity::Verbose)
* @param run_logs Reference to the vector to store generate run logs
* @param log_export_queue The queue of logs to exported to disk
* @param log_export_queue_mutex This mutex must be locked to access log_export_queue
* @param log_export_queue_cdn The condition is notified every time a log has been added to the queue
* @param fast_err_detail Structure to store error details on fast failure for main thread rethrow
* @param _total_runners Total number of runners executing
* @param _isSWIG Flag denoting whether it's a Python build of FLAMEGPU
*/
MPISimRunner(const std::shared_ptr<const ModelData> _model,
std::atomic<unsigned int> &_err_ct,
std::atomic<unsigned int> &_next_run,
const RunPlanVector &_plans,
std::shared_ptr<const StepLoggingConfig> _step_log_config,
std::shared_ptr<const LoggingConfig> _exit_log_config,
int _device_id,
unsigned int _runner_id,
flamegpu::Verbosity _verbosity,
std::vector<RunLog> &run_logs,
std::queue<unsigned int> &log_export_queue,
std::mutex &log_export_queue_mutex,
std::condition_variable &log_export_queue_cdn,
ErrorDetail &fast_err_detail,
unsigned int _total_runners,
bool _isSWIG);
/**
* Start executing the SimRunner in it's separate thread
*/
void start() override;
};

} // namespace detail
} // namespace flamegpu

#endif // INCLUDE_FLAMEGPU_SIMULATION_DETAIL_MPISIMRUNNER_H_
Loading

0 comments on commit 0c876d0

Please sign in to comment.