Skip to content

Commit

Permalink
Merge pull request #316 from lf-lang/initial-STA-wait
Browse files Browse the repository at this point in the history
Handle messages arriving during initial STA wait
  • Loading branch information
edwardalee authored Dec 13, 2023
2 parents 61261af + e8aba6c commit fe39d61
Showing 1 changed file with 39 additions and 17 deletions.
56 changes: 39 additions & 17 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ void _lf_initialize_start_tag(environment_t *env) {
// Add reactions invoked at tag (0,0) (including startup reactions) to the reaction queue
_lf_trigger_startup_reactions(env);

#ifdef FEDERATED
#if defined FEDERATED
// If env is the environment for the top-level enclave, then initialize the federate.
environment_t *top_level_env;
_lf_get_environments(&top_level_env);
Expand All @@ -703,24 +703,21 @@ void _lf_initialize_start_tag(environment_t *env) {
// Get a start_time from the RTI
synchronize_with_other_federates(); // Resets start_time in federated execution according to the RTI.
}

// The start time will likely have changed. Adjust the current tag and stop tag.
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};
if (duration >= 0LL) {
// A duration has been specified. Recalculate the stop time.
env->stop_tag = ((tag_t) {.time = start_time + duration, .microstep = 0});
}
#endif

_lf_initialize_timers(env);

// If the stop_tag is (0,0), also insert the shutdown
// reactions. This can only happen if the timeout time
// was set to 0.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
_lf_trigger_shutdown_reactions(env);
}
// If we have a non-zero STA offset, then we need to allow messages to arrive
// prior to the start time. To avoid spurious STP violations, we temporarily
// set the current time back by the STA offset.
env->current_tag = (tag_t){.time = start_time - _lf_fed_STA_offset, .microstep = 0u};

#if defined FEDERATED
// Call wait_until if federated. This is required because the startup procedure
// in synchronize_with_other_federates() can decide on a new start_time that is
// larger than the current physical time.
Expand All @@ -739,15 +736,25 @@ void _lf_initialize_start_tag(environment_t *env) {
// Here we wait until the start time and also release the environment mutex.
// this means that the other worker threads will be allowed to start. We need
// this to avoid potential deadlock in federated startup.
while(!wait_until(env, start_time, &env->event_q_changed)) {};
LF_PRINT_DEBUG("Done waiting for start time " PRINTF_TIME ".", start_time);
LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be small.",
while(!wait_until(env, start_time + _lf_fed_STA_offset, &env->event_q_changed)) {};
LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + _lf_fed_STA_offset);
LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME
". This should be close to the STA offset.",
lf_time_physical() - start_time);

// Each federate executes the start tag (which is the current
// tag). Inform the RTI of this if needed.
send_next_event_tag(env, env->current_tag, true);
#endif
// Restore the current tag to match the start time.
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

// For messages that may have arrived while we were waiting, put
// reactions on the reaction queue.
_lf_pop_events(env);

// If the stop_tag is (0,0), also insert the shutdown
// reactions. This can only happen if the timeout time
// was set to 0.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
_lf_trigger_shutdown_reactions(env);
}

#ifdef FEDERATED_DECENTRALIZED
// In federated execution (at least under decentralized coordination),
Expand All @@ -759,7 +766,22 @@ void _lf_initialize_start_tag(environment_t *env) {
// to be removed, if appropriate before proceeding to executing tag (0,0).
_lf_wait_on_tag_barrier(env, (tag_t){.time=start_time,.microstep=0});
spawn_staa_thread();
#endif // FEDERATED_DECENTRALIZED

#else // NOT FEDERATED_DECENTRALIZED
// Each federate executes the start tag (which is the current
// tag). Inform the RTI of this if needed.
send_next_event_tag(env, env->current_tag, true);
#endif // NOT FEDERATED_DECENTRALIZED
#else // NOT FEDERATED
_lf_initialize_timers(env);

// If the stop_tag is (0,0), also insert the shutdown
// reactions. This can only happen if the timeout time
// was set to 0.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
_lf_trigger_shutdown_reactions(env);
}
#endif // NOT FEDERATED

// Set the following boolean so that other thread(s), including federated threads,
// know that the execution has started
Expand Down

0 comments on commit fe39d61

Please sign in to comment.