Skip to content

Commit

Permalink
Move hpc scheduler detection into dedicated class. Add option to chec…
Browse files Browse the repository at this point in the history
…kpoint and terminate when a wall clock timer is found, e.g., from a scheduler
  • Loading branch information
Chrismarsh committed Jun 14, 2024
1 parent af157d0 commit dfe5e59
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 25 deletions.
39 changes: 16 additions & 23 deletions src/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,15 @@ void core::config_checkpoint( pt::ptree& value)
SPDLOG_DEBUG("Checkpointing on last timestep");
}

if(!_checkpoint_opts.on_last && !_checkpoint_opts.frequency)
_checkpoint_opts.on_outta_time = value.get_optional<bool>("on_wallclock_limit");

if(!_checkpoint_opts.on_last &&
!_checkpoint_opts.frequency)
{
CHM_THROW_EXCEPTION(config_error, "Checkpointing is enabled but checkpoint.frequency or checkpoint.on_last are not specified.");
}


}

auto file = value.get_optional<std::string>("load_checkpoint_path");
Expand Down Expand Up @@ -1200,25 +1204,7 @@ void core::init(int argc, char **argv)

SPDLOG_DEBUG("PID={}",getpid());

// Check if we are running under slurm
const char* SLURM_JOB_ID = std::getenv("SLURM_JOB_ID");
if(SLURM_JOB_ID)
{
const char* SLURM_TASK_PID = std::getenv("SLURM_TASK_PID"); //The process ID of the task being started.
const char* SLURM_PROCID = std::getenv("SLURM_PROCID"); // The MPI rank (or relative process ID) of the current process

SPDLOG_DEBUG("Detected running under SLURM as jobid {}", SLURM_JOB_ID);
SPDLOG_DEBUG( "SLURM_TASK_PID = {}", SLURM_TASK_PID);
SPDLOG_DEBUG( "SLURM_PROCID = {} ", SLURM_PROCID);
}

// check if we are running under PBS
const char* PBS_JOB_ID = std::getenv("PBS_JOBID");
if(PBS_JOB_ID)
{
SPDLOG_DEBUG("Detected running under PBS as jobid {}", PBS_JOB_ID);
}

_hpc_scheduler_info.detect();

pt::ptree cfg;
try
Expand Down Expand Up @@ -2142,7 +2128,10 @@ void core::run()
}

// save the current state
if(_checkpoint_opts.should_checkpoint(current_ts, (max_ts-1) == current_ts)) // -1 because current_ts is 0 indexed
if(_checkpoint_opts.should_checkpoint(current_ts,
(max_ts-1) == current_ts,
_hpc_scheduler_info
)) // -1 because current_ts is 0 indexed
{
SPDLOG_DEBUG("Checkpointing...");

Expand Down Expand Up @@ -2220,9 +2209,13 @@ void core::run()
tree);
}



SPDLOG_DEBUG("Done checkpoint [ {} s]", c.toc<s>());

// if we checkpointed because we are out of time, we need to stop the simulation
if(_checkpoint_opts.checkpoint_request_terminate)
{
done = true;
}
}

for (auto &itr : _outputs)
Expand Down
90 changes: 88 additions & 2 deletions src/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,30 +382,104 @@ class core

std::vector<output_info> _outputs;

// Detects various information about the HPC scheduler we might be run der
class hpc_scheduler_info
{
public:

boost::posix_time::time_duration max_wallclock; // maximum wallclock in seconds
boost::posix_time::ptime wallclock_start; // time we started the simulation at
bool has_wallclock_limit;

hpc_scheduler_info()
{
max_wallclock = boost::posix_time::seconds(0);
has_wallclock_limit = false;
}

/**
* If we have a wallclock limit, how much time left?
* Only produces a useful delta if has_wallclock_limit = true;
* @return
*/
boost::posix_time::time_duration wallclock_remaining()
{
return max_wallclock - (boost::posix_time::second_clock::local_time()-wallclock_start);
}

void detect()
{
// Check if we are running under slurm
const char* SLURM_JOB_ID = std::getenv("SLURM_JOB_ID");
if (SLURM_JOB_ID)
{
const char* SLURM_TASK_PID = std::getenv("SLURM_TASK_PID"); // The process ID of the task being started.
const char* SLURM_PROCID =
std::getenv("SLURM_PROCID"); // The MPI rank (or relative process ID) of the current process

SPDLOG_DEBUG("Detected running under SLURM as jobid {}", SLURM_JOB_ID);
SPDLOG_DEBUG("SLURM_TASK_PID = {}", SLURM_TASK_PID);
SPDLOG_DEBUG("SLURM_PROCID = {} ", SLURM_PROCID);
}


// check if we are running under PBS
const char* PBS_JOB_ID = std::getenv("PBS_JOBID");
if(PBS_JOB_ID)
{
SPDLOG_DEBUG("Detected running under PBS as jobid {}", PBS_JOB_ID);
}

const char* CHM_WALLCLOCK = std::getenv("CHM_WALLCLOCK");
if(CHM_WALLCLOCK)
{
try {
max_wallclock = boost::posix_time::duration_from_string(CHM_WALLCLOCK);
has_wallclock_limit = true;
wallclock_start = boost::posix_time::second_clock::local_time();
SPDLOG_DEBUG("Detected a max wallclock of {}", boost::posix_time::to_simple_string(max_wallclock));
} catch (...) {
CHM_THROW_EXCEPTION(chm_error, "The value given for environment variable CHM_WALLCLOCK is invalid");
}
}

}
} _hpc_scheduler_info;

// Checkpointing options
class chkptOp
{
public:
chkptOp():
do_checkpoint{false},
load_from_checkpoint{false},
on_last{false}{ }
on_last{false},
checkpoint_request_terminate{false}
{
abort_when_wallclock_left = boost::posix_time::minutes(5);
}

boost::filesystem::path ckpt_path; // root path to chckpoint folder
netcdf in_savestate; // if we are loading from checkpoint
bool do_checkpoint; // should we check point?
bool load_from_checkpoint; // are we loading from a checkpoint?
// amount of time to give ourselves to bail and checkpoint if we have a wall clock limit
boost::posix_time::time_duration abort_when_wallclock_left;

boost::optional<bool> on_outta_time; // bail when we are out of time
boost::optional<bool> on_last; //only checkpoint on the last timestep
boost::optional<size_t> frequency; // frequency of checkpoints

// used to stop the simulation when we checkpoint when we are outta time
bool checkpoint_request_terminate;

/**
* Should checkpointing occur
* @param current_ts
* @param is_last_ts
* @return
*/
bool should_checkpoint(size_t current_ts, bool is_last_ts)
bool should_checkpoint(size_t current_ts, bool is_last_ts, hpc_scheduler_info& scheduler_info)
{
if(!do_checkpoint)
return false;
Expand All @@ -417,6 +491,18 @@ class core
if( frequency && current_ts !=0 && (current_ts % *frequency ==0) )
return true;

// check if we are running out of time
if(on_outta_time && *on_outta_time &&
scheduler_info.has_wallclock_limit &&
scheduler_info.wallclock_remaining() <= abort_when_wallclock_left
)
{
SPDLOG_DEBUG("Detected wallclock of {} remaining. Triggering checkpoint.",
boost::posix_time::to_simple_string(scheduler_info.wallclock_remaining()));
checkpoint_request_terminate = true;
return true;
}

return false;
}

Expand Down

0 comments on commit dfe5e59

Please sign in to comment.