From 4c3c0daf96bd636847a9dc6e76fefe4099864119 Mon Sep 17 00:00:00 2001 From: erling Date: Tue, 15 Oct 2024 18:38:47 -0700 Subject: [PATCH] Partial code-review (#68) --- examples/posix/testing_fed_conn.c | 14 +- examples/posix/timer_ex.c | 5 +- .../zephyr/federated/src/testing_fed_conn.c | 300 ------------------ include/reactor-uc/action.h | 14 +- include/reactor-uc/connection.h | 10 +- include/reactor-uc/environment.h | 35 +- include/reactor-uc/error.h | 6 + include/reactor-uc/federated.h | 2 +- include/reactor-uc/scheduler.h | 51 ++- include/reactor-uc/trigger.h | 6 +- .../{trigger_value.h => trigger_data_queue.h} | 25 +- .../lflang/generator/uc/UcMainGenerator.kt | 2 +- pr32-report.txt | 60 ---- scripts/ci/reports/test/expected.txt | 2 +- scripts/ci/reports/test/main.txt | 2 +- scripts/ci/reports/test/update.txt | 2 +- src/action.c | 16 +- src/connection.c | 46 +-- src/environment.c | 33 +- src/federated.c | 25 +- src/platform/posix/posix.c | 2 +- src/reactor.c | 3 +- src/scheduler.c | 237 +++++++------- src/tag.c | 12 - src/timer.c | 4 +- src/trigger.c | 8 +- src/{trigger_value.c => trigger_data_queue.c} | 22 +- test/unit/action_microstep_test.c | 3 +- test/unit/action_test.c | 2 +- test/unit/delayed_conn_test.c | 2 +- test/unit/physical_action_test.c | 2 +- test/unit/port_test.c | 2 +- test/unit/timer_test.c | 5 +- ...value_test.c => trigger_data_queue_test.c} | 10 +- 34 files changed, 329 insertions(+), 641 deletions(-) delete mode 100644 examples/zephyr/federated/src/testing_fed_conn.c rename include/reactor-uc/{trigger_value.h => trigger_data_queue.h} (61%) delete mode 100644 pr32-report.txt rename src/{trigger_value.c => trigger_data_queue.c} (59%) rename test/unit/{trigger_value_test.c => trigger_data_queue_test.c} (79%) diff --git a/examples/posix/testing_fed_conn.c b/examples/posix/testing_fed_conn.c index 24c0234a..9b4968c3 100644 --- a/examples/posix/testing_fed_conn.c +++ b/examples/posix/testing_fed_conn.c @@ -234,9 +234,9 @@ void *main_sender(void *unused) { (void)unused; Environment_ctor(&env_send, (Reactor *)&sender); MainSender_ctor(&sender, &env_send); - env_send.set_timeout(&env_send, SEC(1)); - env_send.net_channel_size = 1; - env_send.net_channels = (NetworkChannel **)&sender.net_channel; + env_send.scheduler.set_timeout(&env_send.scheduler, SEC(1)); + env_send.net_bundles_size = 1; + env_send.net_bundles = (FederatedConnectionBundle **)&sender.bundle; env_send.assemble(&env_send); env_send.start(&env_send); return NULL; @@ -249,11 +249,11 @@ void *main_recv(void *unused) { Environment_ctor(&env_recv, (Reactor *)&receiver); env_recv.platform->enter_critical_section(env_recv.platform); MainRecv_ctor(&receiver, &env_recv); - env_recv.set_timeout(&env_recv, SEC(1)); - env_recv.keep_alive = true; + env_recv.scheduler.set_timeout(&env_recv.scheduler, SEC(1)); + env_recv.scheduler.keep_alive = true; env_recv.has_async_events = true; - env_recv.net_channel_size = 1; - env_recv.net_channels = (NetworkChannel **)&receiver.net_channels; + env_recv.net_bundles_size = 1; + env_recv.net_bundles = (FederatedConnectionBundle **)&receiver.bundle; env_recv.assemble(&env_recv); env_recv.platform->leave_critical_section(env_recv.platform); env_recv.start(&env_recv); diff --git a/examples/posix/timer_ex.c b/examples/posix/timer_ex.c index 06357830..97989046 100644 --- a/examples/posix/timer_ex.c +++ b/examples/posix/timer_ex.c @@ -18,7 +18,8 @@ struct MyReactor { }; void reaction_0_body(struct MyReactor *self, MyTimer *my_timer) { - printf("Hello World @ %ld\n", self->super.env->current_tag.time); + Environment *env = self->super.env; + printf("Hello World @ %ld\n", env->get_elapsed_physical_time(env)); } void reaction_0_wrapper(Reaction *_self) { @@ -45,7 +46,7 @@ int main() { struct MyReactor my_reactor; Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); - env.set_timeout(&env, SEC(1)); + env.scheduler.set_timeout(&env.scheduler, SEC(1)); MyReactor_ctor(&my_reactor, &env); env.assemble(&env); env.start(&env); diff --git a/examples/zephyr/federated/src/testing_fed_conn.c b/examples/zephyr/federated/src/testing_fed_conn.c deleted file mode 100644 index f8e87e08..00000000 --- a/examples/zephyr/federated/src/testing_fed_conn.c +++ /dev/null @@ -1,300 +0,0 @@ -#include "reactor-uc/reactor-uc.h" -#include -#include - -#define PORT_NUM 8901 - -typedef struct { - char msg[32]; -} msg_t; - -// Reactor Sender -typedef struct { - Timer super; - Reaction *effects[1]; -} Timer1; - -typedef struct { - Reaction super; -} Reaction1; - -typedef struct { - Output super; - Reaction *sources[1]; - msg_t value; -} Out; - -typedef struct { - Reactor super; - Reaction1 reaction; - Timer1 timer; - Out out; - Reaction *_reactions[1]; - Trigger *_triggers[1]; -} Sender; - -void timer_handler(Reaction *_self) { - Sender *self = (Sender *)_self->parent; - Environment *env = self->super.env; - Out *out = &self->out; - - printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env)); - msg_t val; - strcpy(val.msg, "Hello From Sender"); - lf_set(out, val); -} - -void Reaction1_ctor(Reaction1 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, timer_handler, NULL, 0, 0); -} - -void Out_ctor(Out *self, Sender *parent) { - self->sources[0] = &parent->reaction.super; - Output_ctor(&self->super, &parent->super, self->sources, 1); -} - -void Sender_ctor(Sender *self, Reactor *parent, Environment *env) { - self->_reactions[0] = (Reaction *)&self->reaction; - self->_triggers[0] = (Trigger *)&self->timer; - Reactor_ctor(&self->super, "Sender", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); - Reaction1_ctor(&self->reaction, &self->super); - Timer_ctor(&self->timer.super, &self->super, 0, MSEC(100), self->timer.effects, 1); - Out_ctor(&self->out, self); - TIMER_REGISTER_EFFECT(self->timer, self->reaction); - - // Register reaction as a source for out - OUTPUT_REGISTER_SOURCE(self->out, self->reaction); -} - -// Reactor Receiver -typedef struct { - Reaction super; -} Reaction2; - -typedef struct { - Input super; - msg_t buffer[1]; - Reaction *effects[1]; -} In; - -typedef struct { - Reactor super; - Reaction2 reaction; - In inp; - int cnt; - Reaction *_reactions[1]; - Trigger *_triggers[1]; -} Receiver; - -void In_ctor(In *self, Receiver *parent) { - Input_ctor(&self->super, &parent->super, self->effects, 1, self->buffer, sizeof(self->buffer[0])); -} - -void input_handler(Reaction *_self) { - Receiver *self = (Receiver *)_self->parent; - Environment *env = self->super.env; - In *inp = &self->inp; - - printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), lf_get(inp).msg); -} - -void Reaction2_ctor(Reaction2 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, input_handler, NULL, 0, 0); -} - -void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) { - self->_reactions[0] = (Reaction *)&self->reaction; - self->_triggers[0] = (Trigger *)&self->inp; - Reactor_ctor(&self->super, "Receiver", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); - Reaction2_ctor(&self->reaction, &self->super); - In_ctor(&self->inp, self); - - // Register reaction as an effect of in - INPUT_REGISTER_EFFECT(self->inp, self->reaction); -} - -typedef struct { - FederatedOutputConnection super; - msg_t buffer[1]; -} ConnSender; - -void ConnSender_ctor(ConnSender *self, Reactor *parent, FederatedConnectionBundle *bundle, Port *upstream) { - FederatedOutputConnection_ctor(&self->super, parent, bundle, 0, upstream, &self->buffer[0], sizeof(self->buffer[0])); -} - -typedef struct { - FederatedConnectionBundle super; - TcpIpBundle bundle; - ConnSender conn; - FederatedOutputConnection *output[1]; -} SenderRecvBundle; - -void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { - TcpIpBundle_ctor(&self->bundle, "127.0.0.1", PORT_NUM, AF_INET); - ConnSender_ctor(&self->conn, &parent->super, &self->super, &parent->out.super.super); - self->output[0] = &self->conn.super; - - TcpIpBundle *bundle = &self->bundle; - int ret = bundle->bind(bundle); - validate(ret == LF_OK); - printf("Sender: Bound\n"); - - // accept one connection - bool new_connection = bundle->accept(bundle); - validate(new_connection); - printf("Sender: Accepted\n"); - - FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->bundle, NULL, 0, - (FederatedOutputConnection **)&self->output, 1); -} - -typedef struct { - FederatedInputConnection super; - msg_t buffer[5]; - Input *downstreams[1]; -} ConnRecv; - -void ConnRecv_ctor(ConnRecv *self, Reactor *parent) { - FederatedInputConnection_ctor(&self->super, parent, MSEC(100), false, (Port **)&self->downstreams, 1, - &self->buffer[0], sizeof(self->buffer[0]), 5); -} - -typedef struct { - FederatedConnectionBundle super; - TcpIpBundle bundle; - ConnRecv conn; - FederatedInputConnection *inputs[1]; -} RecvSenderBundle; - -void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { - ConnRecv_ctor(&self->conn, parent); - TcpIpBundle_ctor(&self->bundle, "127.0.0.1", PORT_NUM, AF_INET); - self->inputs[0] = &self->conn.super; - - TcpIpBundle *bundle = &self->bundle; - - lf_ret_t ret; - do { - ret = bundle->connect(bundle); - } while (ret != LF_OK); - validate(ret == LF_OK); - printf("Recv: Connected\n"); - - FederatedConnectionBundle_ctor(&self->super, parent, &self->bundle, (FederatedInputConnection **)&self->inputs, 1, - NULL, 0); -} - -// Reactor main -struct MainSender { - Reactor super; - Sender sender; - SenderRecvBundle bundle; - - TcpIpBundle *net_bundles[1]; - Reactor *_children[1]; -}; - -struct MainRecv { - Reactor super; - Receiver receiver; - RecvSenderBundle bundle; - TcpIpBundle *net_bundles[1]; - - Reactor *_children[1]; -}; - -void MainSender_ctor(struct MainSender *self, Environment *env) { - self->_children[0] = &self->sender.super; - Sender_ctor(&self->sender, &self->super, env); - - SenderRecvConn_ctor(&self->bundle, &self->sender); - Reactor_ctor(&self->super, "MainSender", env, NULL, self->_children, 1, NULL, 0, NULL, 0); - - self->net_bundles[0] = &self->bundle.bundle; -} - -void MainRecv_ctor(struct MainRecv *self, Environment *env) { - self->_children[0] = &self->receiver.super; - Receiver_ctor(&self->receiver, &self->super, env); - - RecvSenderBundle_ctor(&self->bundle, &self->super); - - CONN_REGISTER_DOWNSTREAM(self->bundle.conn, self->receiver.inp); - Reactor_ctor(&self->super, "MainRecv", env, NULL, self->_children, 1, NULL, 0, NULL, 0); - - self->net_bundles[0] = &self->bundle.bundle; -} - -Environment env_send; -struct MainSender sender; -void *main_sender(void *unused) { - (void)unused; - Environment_ctor(&env_send, (Reactor *)&sender); - MainSender_ctor(&sender, &env_send); - env_send.set_timeout(&env_send, SEC(1)); - env_send.net_bundles_size = 1; - env_send.net_bundles = (TcpIpBundle **)&sender.net_bundles; - env_send.assemble(&env_send); - env_send.start(&env_send); - return NULL; -} - -Environment env_recv; -struct MainRecv receiver; -void *main_recv(void *unused) { - (void)unused; - Environment_ctor(&env_recv, (Reactor *)&receiver); - env_recv.platform->enter_critical_section(env_recv.platform); - MainRecv_ctor(&receiver, &env_recv); - env_recv.set_timeout(&env_recv, SEC(1)); - env_recv.keep_alive = true; - env_recv.has_async_events = true; - env_recv.net_bundles_size = 1; - env_recv.net_bundles = (TcpIpBundle **)&receiver.net_bundles; - env_recv.assemble(&env_recv); - env_recv.platform->leave_critical_section(env_recv.platform); - env_recv.start(&env_recv); - return NULL; -} - -void lf_exit(void) { - Environment_free(&env_send); - Environment_free(&env_recv); -} - -char t1_stack[4096]; -char t2_stack[4096]; -int main() { - pthread_t thread1; - pthread_attr_t attr1; - pthread_t thread2; - pthread_attr_t attr2; - int ret; - if (atexit(lf_exit) != 0) { - validate(false); - } - - pthread_attr_init(&attr1); - pthread_attr_setstack(&attr1, t1_stack, 4096); - int args; - // Create the first thread running func1 - if (ret = pthread_create(&thread1, &attr1, main_recv, (void *)&args)) { - printf("Error creating thread 1 %d\n", ret); - return 1; - } - - pthread_attr_init(&attr2); - pthread_attr_setstack(&attr2, t2_stack, 4096); - // Create the second thread running func2 - if (ret = pthread_create(&thread2, &attr1, main_sender, (void *)&args)) { - printf("Error creating thread 2 %d\n", ret); - return 1; - } - - // Wait for both threads to finish - pthread_join(thread1, NULL); - pthread_join(thread2, NULL); - - printf("Both threads have finished\n"); - return 0; -} diff --git a/include/reactor-uc/action.h b/include/reactor-uc/action.h index 19addc5d..a3b2339e 100644 --- a/include/reactor-uc/action.h +++ b/include/reactor-uc/action.h @@ -10,13 +10,13 @@ typedef struct LogicalAction LogicalAction; typedef struct PhysicalAction PhysicalAction; struct Action { - Trigger super; // Inherit from Trigger - interval_t min_offset; // The minimum offset from the current time that an event can be scheduled on this action. - interval_t min_spacing; // The minimum spacing between two consecutive events on this action. - tag_t previous_event; // Used to enforce min_spacing - TriggerEffects effects; // The reactions triggered by this Action. - TriggerSources sources; // The reactions that can write to this Action. - TriggerValue trigger_value; // FIFO storage of the data associated with the events scheduled on this action. + Trigger super; // Inherit from Trigger + interval_t min_offset; // The minimum offset from the current time that an event can be scheduled on this action. + interval_t min_spacing; // The minimum spacing between two consecutive events on this action. + tag_t previous_event; // Used to enforce min_spacing + TriggerEffects effects; // The reactions triggered by this Action. + TriggerSources sources; // The reactions that can write to this Action. + TriggerDataQueue trigger_data_queue; // FIFO storage of the data associated with the events scheduled on this action. /** * @brief Schedule an event on this action. */ diff --git a/include/reactor-uc/connection.h b/include/reactor-uc/connection.h index d7431ead..d5618989 100644 --- a/include/reactor-uc/connection.h +++ b/include/reactor-uc/connection.h @@ -46,14 +46,14 @@ struct Connection { * @param parent The reactor in which this connection appears (not the reactors of the ports it connects) * @param downstreams A pointer to an array of pointers to downstream ports. * @param num_downstreams The size of the downstreams array. - * @param trigger_value A pointer to the TriggerValue that holds the data of the events that are scheduled on this - * connection. + * @param trigger_data_queue A pointer to the TriggerDataQueue that holds the data of the events that are scheduled on + * this connection. * @param prepare The prepare function that is called before the connection triggers its downstreams. * @param cleanup The cleanup function that is called at the end of timestep after all reactions have executed. * @param trigger_downstreams The function that triggers all downstreams of this connection. */ void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams, - TriggerValue *trigger_value, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), + TriggerDataQueue *trigger_data_queue, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), void (*trigger_downstreams)(Connection *, const void *, size_t)); struct LogicalConnection { @@ -65,7 +65,7 @@ void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port **dow struct DelayedConnection { Connection super; interval_t delay; - TriggerValue trigger_value; + TriggerDataQueue trigger_data_queue; }; void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, @@ -74,7 +74,7 @@ void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **dow struct PhysicalConnection { Connection super; interval_t delay; - TriggerValue trigger_value; + TriggerDataQueue trigger_data_queue; }; void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, diff --git a/include/reactor-uc/environment.h b/include/reactor-uc/environment.h index 1b6bdaca..af99ada3 100644 --- a/include/reactor-uc/environment.h +++ b/include/reactor-uc/environment.h @@ -9,43 +9,37 @@ #include "reactor-uc/scheduler.h" typedef struct Environment Environment; -typedef struct TcpIpChannel TcpIpChannel; struct Environment { - Reactor *main; // The top-level reactor of the program. - Scheduler scheduler; // The scheduler in charge of executing the reactions. - Platform *platform; // The platform that provides the physical time and sleep functions. - tag_t stop_tag; // The tag at which the program should stop. This is set by the user or by the scheduler. - tag_t current_tag; // The current logical tag. Set by the scheduler and read by user in the reaction bodies. - instant_t start_time; // The physical time at which the program started. - bool keep_alive; // Whether the program should keep running even if there are no more events to process. - bool has_async_events; // Whether the environment either has an action, or has a connection to an upstream federate. - Startup *startup; // A pointer to a startup trigger, if the program has one. - Shutdown *shutdown; // A pointer to a chain of shutdown triggers, if the program has one. - NetworkChannel **net_channels; // A pointer to an array of NetworkChannel pointers that are used to communicate with - // other federates running in different environments. - size_t net_channel_size; // The number of NetworkChannels in the net_channels array. + Reactor *main; // The top-level reactor of the program. + Scheduler scheduler; // The scheduler in charge of executing the reactions. + Platform *platform; // The platform that provides the physical time and sleep functions. + bool has_async_events; + Startup *startup; // A pointer to a startup trigger, if the program has one. + Shutdown *shutdown; // A pointer to a chain of shutdown triggers, if the program has one. + FederatedConnectionBundle **net_bundles; // A pointer to an array of NetworkChannel pointers that are used to + // communicate with other federates running in different environments. + size_t net_bundles_size; // The number of NetworkChannels in the net_channels array. /** * @brief Assemble the program by computing levels for each reaction and setting up the scheduler. */ void (*assemble)(Environment *self); + /** * @brief Start the program. */ void (*start)(Environment *self); + /** * @brief Wrapper around `wait_until` exposed by the platform. - * TODO: Is this needed? */ lf_ret_t (*wait_until)(Environment *self, instant_t wakeup_time); - /** - * @brief Set the stop tag of the program based on a timeout duration. - */ - void (*set_timeout)(Environment *self, interval_t duration); + /** * @brief Get the elapsed logical time since the start of the program. */ interval_t (*get_elapsed_logical_time)(Environment *self); + /** * @brief Get the current logical time */ @@ -58,6 +52,9 @@ struct Environment { * @brief Get the current physical time. */ instant_t (*get_physical_time)(Environment *self); + + void (*enter_critical_section)(Environment *self); + void (*leave_critical_section)(Environment *self); }; void Environment_ctor(Environment *self, Reactor *main); diff --git a/include/reactor-uc/error.h b/include/reactor-uc/error.h index 4fadd657..4cc79b8b 100644 --- a/include/reactor-uc/error.h +++ b/include/reactor-uc/error.h @@ -44,4 +44,10 @@ typedef enum { } \ } while (0) +#define throw(msg) \ + do { \ + printf("Exception `%s` at %s:%d\n", msg, __FILE__, __LINE__); \ + exit(1); \ + } while (0) + #endif diff --git a/include/reactor-uc/federated.h b/include/reactor-uc/federated.h index cd4f0699..dc33a46f 100644 --- a/include/reactor-uc/federated.h +++ b/include/reactor-uc/federated.h @@ -49,7 +49,7 @@ struct FederatedInputConnection { bool is_physical; // Is the connection physical? tag_t last_known_tag; // The latest tag this input is known at. instant_t safe_to_assume_absent; // - TriggerValue trigger_value; + TriggerDataQueue trigger_data_queue; int conn_id; void (*schedule)(FederatedInputConnection *self, TaggedMessage *msg); }; diff --git a/include/reactor-uc/scheduler.h b/include/reactor-uc/scheduler.h index ef5c9624..cb88b4e6 100644 --- a/include/reactor-uc/scheduler.h +++ b/include/reactor-uc/scheduler.h @@ -15,17 +15,62 @@ struct Scheduler { // that are registered for cleanup at the end of the current tag. Trigger *cleanup_ll_head; Trigger *cleanup_ll_tail; - bool executing_tag; + instant_t start_time; // The physical time at which the program started. + tag_t stop_tag; // The tag at which the program should stop. This is set by the user or by the scheduler. + tag_t current_tag; // The current logical tag. Set by the scheduler and read by user in the reaction bodies. + bool keep_alive; // Whether the program should keep running even if there are no more events to process. + + /** + * @brief Schedules an event on trigger at a specified tag. This function will + * enter a critcal section if the environment has async events. + */ lf_ret_t (*schedule_at)(Scheduler *self, Trigger *trigger, tag_t tag); + + /** + * @brief Schedules an event on a trigger at a specified tag. This function + * assumes that we are in a critical section (if this is needed). + * + */ lf_ret_t (*schedule_at_locked)(Scheduler *self, Trigger *trigger, tag_t tag); + + /** + * @brief Runs the program. Does not return until program has completed. + */ void (*run)(Scheduler *self); + + /** + * @brief After committing to a tag, but before executing reactions, the + * scheduler must prepare the timestep by adding reactions to the reaction + * queue. + */ void (*prepare_timestep)(Scheduler *self, tag_t tag); + + /** + * @brief After completing all reactions at a tag, this function is called to + * reset is_present fields and increment index pointers of the TriggerDataQueue. + */ void (*clean_up_timestep)(Scheduler *self); + + /** + * @brief Called after `prepare_timestep` to run all reactions on the current + * tag. + */ void (*run_timestep)(Scheduler *self); + + /** + * @brief Called to execute all reactions triggered by a shutdown trigger. + */ void (*terminate)(Scheduler *self); - // Register Trigger for cleanup. Cleanup happens after all reactions have - // executed at a particular tag. + /** + * @brief Set the stop tag of the program based on a timeout duration. + */ + void (*set_timeout)(Scheduler *self, interval_t duration); + + /** + * @brief Register Trigger for cleanup. The cleanup function of the trigger + * will be called in `clean_up_timestep`. + */ void (*register_for_cleanup)(Scheduler *self, Trigger *trigger); }; diff --git a/include/reactor-uc/trigger.h b/include/reactor-uc/trigger.h index d6525b2e..e99fdad1 100644 --- a/include/reactor-uc/trigger.h +++ b/include/reactor-uc/trigger.h @@ -4,7 +4,7 @@ #include "reactor-uc/macros.h" #include "reactor-uc/reaction.h" #include "reactor-uc/reactor.h" -#include "reactor-uc/trigger_value.h" +#include "reactor-uc/trigger_data_queue.h" #include typedef struct Trigger Trigger; @@ -61,14 +61,14 @@ struct Trigger { // linked list of triggers registered for cleanup Trigger *next; // For chaining together triggers, used by Scheduler to store triggers that should be cleaned up in a // linked list - TriggerValue *trigger_value; // A pointer to a TriggerValue field in a child type, Can be NULL + TriggerDataQueue *trigger_data_queue; // A pointer to a TriggerDataQueue field in a child type, Can be NULL void (*prepare)(Trigger *); void (*cleanup)(Trigger *); const void *(*get)(Trigger *); } __attribute__((aligned(MEM_ALIGNMENT))); // FIXME: This should not be necessary... -void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, TriggerValue *trigger_value, +void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, TriggerDataQueue *trigger_data_queue, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), const void *(*get)(Trigger *)); #endif diff --git a/include/reactor-uc/trigger_value.h b/include/reactor-uc/trigger_data_queue.h similarity index 61% rename from include/reactor-uc/trigger_value.h rename to include/reactor-uc/trigger_data_queue.h index d9644707..68c1d06a 100644 --- a/include/reactor-uc/trigger_value.h +++ b/include/reactor-uc/trigger_data_queue.h @@ -1,26 +1,25 @@ -#ifndef REACTOR_UC_TRIGGER_VALUE_H -#define REACTOR_UC_TRIGGER_VALUE_H +#ifndef REACTOR_UC_TRIGGER_DATA_QUEUE_H +#define REACTOR_UC_TRIGGER_DATA_QUEUE_H #include "reactor-uc/error.h" #include #include -typedef struct TriggerValue TriggerValue; +typedef struct TriggerDataQueue TriggerDataQueue; -// FIXME: Handle "void" TriggerValues /** - * @brief A TriggerValue is a type wrapping the memory associated with a Trigger. - * In reactor-uc this memory must be statically allocated and TriggerValue - * implements a FIFO around it. The TriggerValue needs a pointer to the allocated + * @brief A TriggerDataQueue is a type wrapping the memory associated with a Trigger. + * In reactor-uc this memory must be statically allocated and TriggerDataQueue + * implements a FIFO around it. The TriggerDataQueue needs a pointer to the allocated * memory and the allocated size as well as the size of each element. Then we * can stage, push and pop data from the runtime without knowing about the types * involved.. * */ -struct TriggerValue { +struct TriggerDataQueue { char *buffer; size_t read_idx; size_t write_idx; - size_t value_size; + size_t value_size; // TODO: BEtter name. size_t capacity; bool empty; bool staged; @@ -32,22 +31,22 @@ struct TriggerValue { * multiple writes to a trigger in a single tag and "last write wins." * */ - lf_ret_t (*stage)(TriggerValue *, const void *value); + lf_ret_t (*stage)(TriggerDataQueue *, const void *value); /** * @brief Pushes the staged value into the FIFO. * */ - lf_ret_t (*push)(TriggerValue *); + lf_ret_t (*push)(TriggerDataQueue *); /** * @brief Increments the `read_idx` and as such pops the head of the * queue. This function does not return the head of the queue. Only increments * the pointers. */ - lf_ret_t (*pop)(TriggerValue *); + lf_ret_t (*pop)(TriggerDataQueue *); }; -void TriggerValue_ctor(TriggerValue *self, char *buffer, size_t value_size, size_t capacity); +void TriggerDataQueue_ctor(TriggerDataQueue *self, char *buffer, size_t value_size, size_t capacity); #endif diff --git a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt index 888c6a02..268ed5c7 100644 --- a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt +++ b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt @@ -43,7 +43,7 @@ class UcMainGenerator( |static ${main.codeType} main_reactor; |void lf_main(void) { | Environment_ctor(&env, &main_reactor.super); - | ${if (targetConfig.isSet(TimeOutProperty.INSTANCE)) "env.set_timeout(&env, ${targetConfig.get(TimeOutProperty.INSTANCE).toCCode()});" else ""} + | ${if (targetConfig.isSet(TimeOutProperty.INSTANCE)) "env.scheduler.set_timeout(&env.scheduler, ${targetConfig.get(TimeOutProperty.INSTANCE).toCCode()});" else ""} | ${main.codeType}_ctor(&main_reactor, &env, NULL); | env.assemble(&env); | env.start(&env); diff --git a/pr32-report.txt b/pr32-report.txt deleted file mode 100644 index 630ef5d9..00000000 --- a/pr32-report.txt +++ /dev/null @@ -1,60 +0,0 @@ -Test 0: action_test.c: -% text: 7.20 (from 36104 -> 38702) -% data: -2.47 (from 728 -> 710) -% bss : 56.25 (from 512 -> 800) -% dec : 7.68 (from 37344 -> 40212) - -Test 1: delayed.conn_test.c: -% text: -15.05 (from 39103 -> 33218) -% data: -51.10 (from 728 -> 356) -% bss : 138.67 (from 512 -> 1222) -% dec : -14.00 (from 40343 -> 34696) - -Test 2: event_queue_test.c: -% text: -0.22 (from 22472 -> 22422) -% data: 20.25 (from 632 -> 760) -% bss : 0.00 (from 320 -> 320) -% dec : 0.33 (from 23424 -> 23502) - -Test 3: physical_action_test.c: -% text: 12.55 (from 37566 -> 42280) -% data: 157.72 (from 745 -> 1920) -% bss : -89.75 (from 1952 -> 200) -% dec : 10.27 (from 40263 -> 44400) - -Test 4: port_test.c: -% text: -17.01 (from 39067 -> 32420) -% data: 5.49 (from 728 -> 768) -% bss : -2.34 (from 512 -> 500) -% dec : -16.42 (from 40307 -> 33688) - -Test 5: reaction_queue_test.c: -% text: 35.07 (from 22117 -> 29874) -% data: -36.71 (from 632 -> 400) -% bss : -15.62 (from 320 -> 270) -% dec : 32.40 (from 23069 -> 30544) - -Test 6: shutdown_test.c: -% text: -60.66 (from 32808 -> 12908) -% data: -41.67 (from 720 -> 420) -% bss : 52.34 (from 512 -> 780) -% dec : -58.55 (from 34040 -> 14108) - -Test 7: startup_test.c: -% text: -53.36 (from 31926 -> 14890) -% data: 0.00 (from 720 -> 720) -% bss : 0.00 (from 512 -> 512) -% dec : -51.38 (from 33158 -> 16122) - -Test 8: timer_test.c: -% text: -2.67 (from 32042 -> 31188) -% data: -9.17 (from 720 -> 654) -% bss : 75.78 (from 512 -> 900) -% dec : -1.60 (from 33274 -> 32742) - -Test 9: trigger_value_test.c: -% text: -3.02 (from 18191 -> 17642) -% data: -31.65 (from 632 -> 432) -% bss : -37.50 (from 320 -> 200) -% dec : -4.54 (from 19143 -> 18274) - diff --git a/scripts/ci/reports/test/expected.txt b/scripts/ci/reports/test/expected.txt index 630ef5d9..b1946c18 100644 --- a/scripts/ci/reports/test/expected.txt +++ b/scripts/ci/reports/test/expected.txt @@ -52,7 +52,7 @@ Test 8: timer_test.c: % bss : 75.78 (from 512 -> 900) % dec : -1.60 (from 33274 -> 32742) -Test 9: trigger_value_test.c: +Test 9: trigger_data_queue_test.c: % text: -3.02 (from 18191 -> 17642) % data: -31.65 (from 632 -> 432) % bss : -37.50 (from 320 -> 200) diff --git a/scripts/ci/reports/test/main.txt b/scripts/ci/reports/test/main.txt index e475959d..f099d18e 100644 --- a/scripts/ci/reports/test/main.txt +++ b/scripts/ci/reports/test/main.txt @@ -17,4 +17,4 @@ text data bss dec hex filename 32042 720 512 33274 81fa timer_test_c text data bss dec hex filename - 18191 632 320 19143 4ac7 trigger_value_test_c \ No newline at end of file + 18191 632 320 19143 4ac7 trigger_data_queue_test_c \ No newline at end of file diff --git a/scripts/ci/reports/test/update.txt b/scripts/ci/reports/test/update.txt index 54ad1d33..7913b73d 100644 --- a/scripts/ci/reports/test/update.txt +++ b/scripts/ci/reports/test/update.txt @@ -17,4 +17,4 @@ text data bss dec hex filename 31188 654 900 32742 7fe6 timer_test_c text data bss dec hex filename - 17642 432 200 18274 4762 trigger_value_test_c \ No newline at end of file + 17642 432 200 18274 4762 trigger_data_queue_test_c \ No newline at end of file diff --git a/src/action.c b/src/action.c index c44ac647..01bd15ae 100644 --- a/src/action.c +++ b/src/action.c @@ -10,7 +10,7 @@ void Action_cleanup(Trigger *self) { LF_DEBUG(TRIG, "Cleaning up action %p", self); Action *act = (Action *)self; self->is_present = false; - validaten(act->trigger_value.pop(&act->trigger_value)); + validaten(act->trigger_data_queue.pop(&act->trigger_data_queue)); } void Action_prepare(Trigger *self) { @@ -29,8 +29,8 @@ void Action_prepare(Trigger *self) { void Action_ctor(Action *self, TriggerType type, interval_t min_offset, interval_t min_spacing, Reactor *parent, Reaction **sources, size_t sources_size, Reaction **effects, size_t effects_size, void *value_buf, size_t value_size, size_t value_capacity, lf_ret_t (*schedule)(Action *, interval_t, const void *)) { - TriggerValue_ctor(&self->trigger_value, value_buf, value_size, value_capacity); - Trigger_ctor(&self->super, type, parent, &self->trigger_value, Action_prepare, Action_cleanup, NULL); + TriggerDataQueue_ctor(&self->trigger_data_queue, value_buf, value_size, value_capacity); + Trigger_ctor(&self->super, type, parent, &self->trigger_data_queue, Action_prepare, Action_cleanup, NULL); self->min_offset = min_offset; self->min_spacing = min_spacing; self->previous_event = NEVER_TAG; @@ -46,15 +46,15 @@ void Action_ctor(Action *self, TriggerType type, interval_t min_offset, interval lf_ret_t LogicalAction_schedule(Action *self, interval_t offset, const void *value) { Environment *env = self->super.parent->env; Scheduler *sched = &env->scheduler; - tag_t proposed_tag = lf_delay_tag(env->current_tag, offset); + tag_t proposed_tag = lf_delay_tag(sched->current_tag, offset); tag_t earliest_allowed = lf_delay_tag(self->previous_event, self->min_spacing); if (lf_tag_compare(proposed_tag, earliest_allowed) < 0) { return LF_INVALID_TAG; } if (value) { - self->trigger_value.stage(&self->trigger_value, value); - self->trigger_value.push(&self->trigger_value); + self->trigger_data_queue.stage(&self->trigger_data_queue, value); + self->trigger_data_queue.push(&self->trigger_data_queue); } else { return LF_INVALID_VALUE; } @@ -87,8 +87,8 @@ lf_ret_t PhysicalAction_schedule(Action *self, interval_t offset, const void *va } if (value) { - self->trigger_value.stage(&self->trigger_value, value); - self->trigger_value.push(&self->trigger_value); + self->trigger_data_queue.stage(&self->trigger_data_queue, value); + self->trigger_data_queue.push(&self->trigger_data_queue); } else { env->platform->leave_critical_section(env->platform); return LF_INVALID_VALUE; diff --git a/src/connection.c b/src/connection.c index c01da25b..6480c445 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1,7 +1,7 @@ #include "reactor-uc/connection.h" #include "reactor-uc/environment.h" #include "reactor-uc/logging.h" -#include "reactor-uc/trigger_value.h" +#include "reactor-uc/trigger_data_queue.h" #include #include @@ -62,7 +62,7 @@ void LogicalConnection_trigger_downstreams(Connection *self, const void *value, } void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams, - TriggerValue *trigger_value, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), + TriggerDataQueue *trigger_data_queue, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), void (*trigger_downstreams)(Connection *, const void *, size_t)) { self->upstream = NULL; @@ -73,7 +73,7 @@ void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port * self->get_final_upstream = Connection_get_final_upstream; self->trigger_downstreams = trigger_downstreams; - Trigger_ctor(&self->super, type, parent, trigger_value, prepare, cleanup, NULL); + Trigger_ctor(&self->super, type, parent, trigger_data_queue, prepare, cleanup, NULL); } void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams) { @@ -92,19 +92,19 @@ void DelayedConnection_prepare(Trigger *trigger) { LF_DEBUG(CONN, "Preparing delayed connection %p for triggering", trigger); DelayedConnection *self = (DelayedConnection *)trigger; Scheduler *sched = &trigger->parent->env->scheduler; - TriggerValue *tval = &self->trigger_value; + TriggerDataQueue *tval = &self->trigger_data_queue; void *value_ptr = (void *)&tval->buffer[tval->read_idx * tval->value_size]; trigger->is_present = true; sched->register_for_cleanup(sched, trigger); - LogicalConnection_trigger_downstreams(&self->super, value_ptr, self->trigger_value.value_size); + LogicalConnection_trigger_downstreams(&self->super, value_ptr, self->trigger_data_queue.value_size); } /** * @brief Called at the end of logical tags. In charge of two things: - * 1. Increment `read_idx` of the TriggerValue FIFO through the `pop` call. - * 2. Increment the `write_idx` of the TriggerValue FIFO (`push`) and schedule + * 1. Increment `read_idx` of the TriggerDataQueue FIFO through the `pop` call. + * 2. Increment the `write_idx` of the TriggerDataQueue FIFO (`push`) and schedule * an event based on the delay of this connection. */ void DelayedConnection_cleanup(Trigger *trigger) { @@ -115,17 +115,17 @@ void DelayedConnection_cleanup(Trigger *trigger) { if (trigger->is_present) { LF_DEBUG(CONN, "Delayed connection %p had a present value this tag. Pop it", trigger); trigger->is_present = false; - int ret = self->trigger_value.pop(&self->trigger_value); + int ret = self->trigger_data_queue.pop(&self->trigger_data_queue); validaten(ret); } - if (self->trigger_value.staged) { + if (self->trigger_data_queue.staged) { LF_DEBUG(CONN, "Delayed connection %p had a staged value. Schedule it", trigger); Environment *env = self->super.super.parent->env; Scheduler *sched = &env->scheduler; - tag_t tag = lf_delay_tag(env->current_tag, self->delay); - self->trigger_value.push(&self->trigger_value); + tag_t tag = lf_delay_tag(sched->current_tag, self->delay); + self->trigger_data_queue.push(&self->trigger_data_queue); sched->schedule_at(sched, trigger, tag); } } @@ -141,9 +141,9 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value, Scheduler *sched = &_self->super.parent->env->scheduler; if (value) { - self->trigger_value.stage(&self->trigger_value, value); + self->trigger_data_queue.stage(&self->trigger_data_queue, value); } else { - validate(false); + throw("DelayedConnection with untyped value"); } sched->register_for_cleanup(sched, &_self->super); } @@ -151,8 +151,8 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value, void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size, size_t value_capacity) { self->delay = delay; - TriggerValue_ctor(&self->trigger_value, value_buf, value_size, value_capacity); - Connection_ctor(&self->super, TRIG_CONN_DELAYED, parent, downstreams, num_downstreams, &self->trigger_value, + TriggerDataQueue_ctor(&self->trigger_data_queue, value_buf, value_size, value_capacity); + Connection_ctor(&self->super, TRIG_CONN_DELAYED, parent, downstreams, num_downstreams, &self->trigger_data_queue, DelayedConnection_prepare, DelayedConnection_cleanup, DelayedConnection_trigger_downstreams); } @@ -160,13 +160,13 @@ void PhysicalConnection_prepare(Trigger *trigger) { LF_DEBUG(CONN, "Preparing physical connection %p for triggering", trigger); PhysicalConnection *self = (PhysicalConnection *)trigger; Scheduler *sched = &trigger->parent->env->scheduler; - TriggerValue *tval = &self->trigger_value; + TriggerDataQueue *tval = &self->trigger_data_queue; void *value_ptr = (void *)&tval->buffer[tval->read_idx * tval->value_size]; trigger->is_present = true; sched->register_for_cleanup(sched, trigger); - LogicalConnection_trigger_downstreams(&self->super, value_ptr, self->trigger_value.value_size); + LogicalConnection_trigger_downstreams(&self->super, value_ptr, self->trigger_data_queue.value_size); } void PhysicalConnection_cleanup(Trigger *trigger) { @@ -177,18 +177,18 @@ void PhysicalConnection_cleanup(Trigger *trigger) { if (trigger->is_present) { LF_DEBUG(CONN, "Physical connection %p had a present value this tag. Pop it", trigger); trigger->is_present = false; - int ret = self->trigger_value.pop(&self->trigger_value); + int ret = self->trigger_data_queue.pop(&self->trigger_data_queue); validate(ret == 0); } - if (self->trigger_value.staged) { + if (self->trigger_data_queue.staged) { LF_DEBUG(CONN, "Physical connection %p had a staged value. Schedule it", trigger); Environment *env = self->super.super.parent->env; Scheduler *sched = &env->scheduler; tag_t now_tag = {.time = env->get_physical_time(env), .microstep = 0}; tag_t tag = lf_delay_tag(now_tag, self->delay); - validaten(self->trigger_value.push(&self->trigger_value)); + validaten(self->trigger_data_queue.push(&self->trigger_data_queue)); validaten(sched->schedule_at(sched, trigger, tag)); } } @@ -201,7 +201,7 @@ void PhysicalConnection_trigger_downstreams(Connection *_self, const void *value validate(value); // Try to stage the value for scheduling. - int res = self->trigger_value.stage(&self->trigger_value, value); + int res = self->trigger_data_queue.stage(&self->trigger_data_queue, value); if (res == LF_OK) { sched->register_for_cleanup(sched, &_self->super); } @@ -210,8 +210,8 @@ void PhysicalConnection_trigger_downstreams(Connection *_self, const void *value void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size, size_t value_capacity) { - TriggerValue_ctor(&self->trigger_value, value_buf, value_size, value_capacity); - Connection_ctor(&self->super, TRIG_CONN_PHYSICAL, parent, downstreams, num_downstreams, &self->trigger_value, + TriggerDataQueue_ctor(&self->trigger_data_queue, value_buf, value_size, value_capacity); + Connection_ctor(&self->super, TRIG_CONN_PHYSICAL, parent, downstreams, num_downstreams, &self->trigger_data_queue, PhysicalConnection_prepare, PhysicalConnection_cleanup, PhysicalConnection_trigger_downstreams); self->delay = delay; } diff --git a/src/environment.c b/src/environment.c index dca45066..0f4200e3 100644 --- a/src/environment.c +++ b/src/environment.c @@ -20,18 +20,25 @@ lf_ret_t Environment_wait_until(Environment *self, instant_t wakeup_time) { } } -void Environment_set_timeout(Environment *self, interval_t duration) { - self->stop_tag.microstep = 0; - self->stop_tag.time = self->start_time + duration; +interval_t Environment_get_logical_time(Environment *self) { return self->scheduler.current_tag.time; } +interval_t Environment_get_elapsed_logical_time(Environment *self) { + return self->scheduler.current_tag.time - self->scheduler.start_time; } - -interval_t Environment_get_logical_time(Environment *self) { return self->current_tag.time; } -interval_t Environment_get_elapsed_logical_time(Environment *self) { return self->current_tag.time - self->start_time; } interval_t Environment_get_physical_time(Environment *self) { return self->platform->get_physical_time(self->platform); } interval_t Environment_get_elapsed_physical_time(Environment *self) { - return self->platform->get_physical_time(self->platform) - self->start_time; + return self->platform->get_physical_time(self->platform) - self->scheduler.start_time; +} +void Environment_enter_critical_section(Environment *self) { + if (self->has_async_events) { + self->platform->enter_critical_section(self->platform); + } +} +void Environment_leave_critical_section(Environment *self) { + if (self->has_async_events) { + self->platform->leave_critical_section(self->platform); + } } void Environment_ctor(Environment *self, Reactor *main) { @@ -43,24 +50,16 @@ void Environment_ctor(Environment *self, Reactor *main) { self->assemble = Environment_assemble; self->start = Environment_start; self->wait_until = Environment_wait_until; - self->set_timeout = Environment_set_timeout; self->get_elapsed_logical_time = Environment_get_elapsed_logical_time; self->get_logical_time = Environment_get_logical_time; self->get_physical_time = Environment_get_physical_time; self->get_elapsed_physical_time = Environment_get_elapsed_physical_time; - - self->keep_alive = false; + self->leave_critical_section = Environment_leave_critical_section; + self->enter_critical_section = Environment_enter_critical_section; self->has_async_events = false; self->startup = NULL; self->shutdown = NULL; - self->stop_tag = FOREVER_TAG; Scheduler_ctor(&self->scheduler, self); - self->current_tag = NEVER_TAG; - - // Set start time - // TODO: This must be resolved in the federation. Currently set start tag to nearest second. - self->start_time = ((self->platform->get_physical_time(self->platform) + SEC(1)) / SEC(1)) * SEC(1); - LF_INFO(ENV, "Start time: %" PRId64, self->start_time); } void Environment_free(Environment *self) { diff --git a/src/federated.c b/src/federated.c index 3c7edd0e..ebd92b9d 100644 --- a/src/federated.c +++ b/src/federated.c @@ -24,6 +24,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { LF_DEBUG(FED, "Cleaning up federated output connection %p", trigger); FederatedOutputConnection *self = (FederatedOutputConnection *)trigger; Environment *env = trigger->parent->env; + Scheduler *sched = &env->scheduler; NetworkChannel *channel = self->bundle->net_channel; validate(trigger->is_registered_for_cleanup); validaten(trigger->is_present); @@ -32,8 +33,8 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { TaggedMessage msg; msg.conn_id = self->conn_id; - msg.tag.time = env->current_tag.time; - msg.tag.microstep = env->current_tag.microstep; + msg.tag.time = sched->current_tag.time; + msg.tag.microstep = sched->current_tag.microstep; memcpy(msg.payload.bytes, self->value_ptr, self->value_size); msg.payload.size = self->value_size; @@ -64,13 +65,13 @@ void FederatedInputConnection_prepare(Trigger *trigger) { LF_DEBUG(FED, "Preparing federated input connection %p for triggering", trigger); FederatedInputConnection *self = (FederatedInputConnection *)trigger; Scheduler *sched = &trigger->parent->env->scheduler; - TriggerValue *tval = &self->trigger_value; + TriggerDataQueue *tval = &self->trigger_data_queue; void *value_ptr = (void *)&tval->buffer[tval->read_idx * tval->value_size]; trigger->is_present = true; sched->register_for_cleanup(sched, trigger); - LogicalConnection_trigger_downstreams(&self->super, value_ptr, self->trigger_value.value_size); + LogicalConnection_trigger_downstreams(&self->super, value_ptr, self->trigger_data_queue.value_size); } // Called at the end of a logical tag if it was registered for cleanup. @@ -81,20 +82,20 @@ void FederatedInputConnection_cleanup(Trigger *trigger) { if (trigger->is_present) { trigger->is_present = false; - int ret = self->trigger_value.pop(&self->trigger_value); + int ret = self->trigger_data_queue.pop(&self->trigger_data_queue); validaten(ret); } // Should never happen. - validaten(self->trigger_value.staged); + validaten(self->trigger_data_queue.staged); } void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *parent, interval_t delay, bool is_physical, Port **downstreams, size_t downstreams_size, void *value_buf, size_t value_size, size_t value_capacity) { - TriggerValue_ctor(&self->trigger_value, value_buf, value_size, value_capacity); - Connection_ctor(&self->super, TRIG_CONN_FEDERATED_INPUT, parent, downstreams, downstreams_size, &self->trigger_value, - FederatedInputConnection_prepare, FederatedInputConnection_cleanup, NULL); + TriggerDataQueue_ctor(&self->trigger_data_queue, value_buf, value_size, value_capacity); + Connection_ctor(&self->super, TRIG_CONN_FEDERATED_INPUT, parent, downstreams, downstreams_size, + &self->trigger_data_queue, FederatedInputConnection_prepare, FederatedInputConnection_cleanup, NULL); self->delay = delay; self->is_physical = is_physical; self->last_known_tag = NEVER_TAG; @@ -122,10 +123,10 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, tag = lf_delay_tag(tag, input->delay); LF_DEBUG(FED, "Scheduling input %p at tag=%" PRId64 ":%" PRIu32, input, tag.time, tag.microstep); - // Take the value received over the network copy it into the trigger_value of + // Take the value received over the network copy it into the trigger_data_queue of // the input port and schedule an event for it. - input->trigger_value.stage(&input->trigger_value, &msg->payload.bytes); - input->trigger_value.push(&input->trigger_value); + input->trigger_data_queue.stage(&input->trigger_data_queue, &msg->payload.bytes); + input->trigger_data_queue.push(&input->trigger_data_queue); lf_ret_t ret = sched->schedule_at_locked(sched, &input->super.super, tag); if (ret == LF_OK) { env->platform->new_async_event(env->platform); diff --git a/src/platform/posix/posix.c b/src/platform/posix/posix.c index 2d66ebf2..54a77cc3 100644 --- a/src/platform/posix/posix.c +++ b/src/platform/posix/posix.c @@ -35,7 +35,7 @@ instant_t PlatformPosix_get_physical_time(Platform *self) { (void)self; struct timespec tspec; if (clock_gettime(CLOCK_REALTIME, (struct timespec *)&tspec) != 0) { - validate(false); + throw("POSIX could not get physical time"); } return convert_timespec_to_ns(tspec); } diff --git a/src/reactor.c b/src/reactor.c index 2b8b3352..cf483f24 100644 --- a/src/reactor.c +++ b/src/reactor.c @@ -13,8 +13,9 @@ void Reactor_register_startup(Reactor *self, Startup *startup) { (void)self; LF_DEBUG(ENV, "Registering startup trigger %p with Reactor %s", startup, self->name); Environment *env = self->env; + Scheduler *sched = &env->scheduler; if (!env->startup) { - tag_t start_tag = {.microstep = 0, .time = self->env->start_time}; + tag_t start_tag = {.microstep = 0, .time = sched->start_time}; validaten(env->scheduler.schedule_at(&env->scheduler, &startup->super, start_tag)); env->startup = startup; } else { diff --git a/src/scheduler.c b/src/scheduler.c index fbbd76c1..f517b82e 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -3,80 +3,13 @@ #include "reactor-uc/logging.h" #include "reactor-uc/reactor-uc.h" -void Scheduler_register_for_cleanup(Scheduler *self, Trigger *trigger) { - LF_DEBUG(SCHED, "Registering trigger %p for cleanup", trigger); - if (trigger->is_registered_for_cleanup) { - return; - } - - if (self->cleanup_ll_head) { - self->cleanup_ll_tail->next = trigger; - self->cleanup_ll_tail = trigger; - } else { - validaten(self->cleanup_ll_tail); - self->cleanup_ll_head = trigger; - self->cleanup_ll_tail = trigger; - } - trigger->is_registered_for_cleanup = true; -} - -void Scheduler_prepare_timestep(Scheduler *self, tag_t tag) { - LF_DEBUG(SCHED, "Preparing timestep for tag %" PRId64 ":%" PRIu32, tag.time, tag.microstep); - self->env->current_tag = tag; - self->executing_tag = true; - self->reaction_queue.reset(&self->reaction_queue); -} - -void Scheduler_clean_up_timestep(Scheduler *self) { - assert(self->executing_tag); - assert(self->reaction_queue.empty(&self->reaction_queue)); - LF_DEBUG(SCHED, "Cleaning up timestep for tag %" PRId64 ":%" PRIu32, self->env->current_tag.time, - self->env->current_tag.microstep); - self->executing_tag = false; - Trigger *cleanup_trigger = self->cleanup_ll_head; - - while (cleanup_trigger) { - Trigger *this = cleanup_trigger; - assert(!(this->next == NULL && this != self->cleanup_ll_tail)); - this->cleanup(this); - cleanup_trigger = this->next; - this->next = NULL; - this->is_registered_for_cleanup = false; - } +// Private functions - self->cleanup_ll_head = NULL; - self->cleanup_ll_tail = NULL; -} - -void Scheduler_run_timestep(Scheduler *self) { - while (!self->reaction_queue.empty(&self->reaction_queue)) { - Reaction *reaction = self->reaction_queue.pop(&self->reaction_queue); - validate(reaction); - LF_DEBUG(SCHED, "Executing %s->reaction_%d", reaction->parent->name, reaction->index); - reaction->body(reaction); - } -} - -void Scheduler_terminate(Scheduler *self) { - LF_INFO(SCHED, "Scheduler terminating"); - Environment *env = self->env; - self->prepare_timestep(self, env->stop_tag); - - if (env->has_async_events) { - env->platform->leave_critical_section(env->platform); - } - - Trigger *shutdown = &self->env->shutdown->super; - while (shutdown) { - LF_DEBUG(SCHED, "Doing shutdown trigger %p", shutdown); - shutdown->prepare(shutdown); - shutdown = shutdown->next; - } - self->run_timestep(self); - self->clean_up_timestep(self); -} - -void Scheduler_handle_builtin(Trigger *trigger) { +/** + * @brief Builtin triggers (startup/shutdown) are chained together as a linked + * list and to prepare such a trigger we must iterate through the list. + */ +static void Scheduler_prepare_builtin(Trigger *trigger) { do { trigger->prepare(trigger); if (trigger->type == TRIG_STARTUP) { @@ -87,15 +20,19 @@ void Scheduler_handle_builtin(Trigger *trigger) { } while (trigger); } -void Scheduler_pop_events(Scheduler *self, tag_t next_tag) { +/** + * @brief Pop off all the events from the event queue which have a tag matching + * `next_tag` and prepare the associated triggers. + */ +static void Scheduler_pop_events_and_prepare(Scheduler *self, tag_t next_tag) { do { Event event = self->event_queue.pop(&self->event_queue); - validate(lf_tag_compare(event.tag, next_tag) == 0); + assert(lf_tag_compare(event.tag, next_tag) == 0); LF_DEBUG(SCHED, "Handling event %p for tag %" PRId64 ":%" PRIu32, &event, event.tag.time, event.tag.microstep); Trigger *trigger = event.trigger; if (trigger->type == TRIG_STARTUP || trigger->type == TRIG_SHUTDOWN) { - Scheduler_handle_builtin(trigger); + Scheduler_prepare_builtin(trigger); } else { trigger->prepare(trigger); } @@ -111,7 +48,7 @@ void Scheduler_pop_events(Scheduler *self, tag_t next_tag) { * @param next_tag * @return lf_ret_t */ -lf_ret_t Scheduler_acquire_tag(Scheduler *self, tag_t next_tag) { +static lf_ret_t Scheduler_federated_acquire_tag(Scheduler *self, tag_t next_tag) { LF_DEBUG(SCHED, "Acquiring tag %" PRId64 ":%" PRIu32, next_tag.time, next_tag.microstep); Environment *env = self->env; Reactor *main = env->main; @@ -135,31 +72,99 @@ lf_ret_t Scheduler_acquire_tag(Scheduler *self, tag_t next_tag) { if (additional_sleep > 0) { LF_DEBUG(SCHED, "Need to sleep for additional %" PRId64 " ns", additional_sleep); - instant_t now = env->get_physical_time(env); - return env->wait_until(env, lf_time_add(now, additional_sleep)); + instant_t sleep_until = lf_time_add(env->get_logical_time(env), additional_sleep); + return env->wait_until(env, sleep_until); } else { return LF_OK; } } +void Scheduler_register_for_cleanup(Scheduler *self, Trigger *trigger) { + LF_DEBUG(SCHED, "Registering trigger %p for cleanup", trigger); + if (trigger->is_registered_for_cleanup) { + return; + } + + if (self->cleanup_ll_head == NULL) { + assert(self->cleanup_ll_tail == NULL); + self->cleanup_ll_head = trigger; + self->cleanup_ll_tail = trigger; + } else { + self->cleanup_ll_tail->next = trigger; + self->cleanup_ll_tail = trigger; + } + trigger->is_registered_for_cleanup = true; +} + +void Scheduler_prepare_timestep(Scheduler *self, tag_t tag) { + LF_DEBUG(SCHED, "Preparing timestep for tag %" PRId64 ":%" PRIu32, tag.time, tag.microstep); + self->current_tag = tag; + self->reaction_queue.reset(&self->reaction_queue); +} + +void Scheduler_clean_up_timestep(Scheduler *self) { + assert(self->reaction_queue.empty(&self->reaction_queue)); + + assert(self->cleanup_ll_head && self->cleanup_ll_tail); + LF_DEBUG(SCHED, "Cleaning up timestep for tag %" PRId64 ":%" PRIu32, self->current_tag.time, + self->current_tag.microstep); + Trigger *cleanup_trigger = self->cleanup_ll_head; + + while (cleanup_trigger) { + Trigger *this = cleanup_trigger; + assert(!(this->next == NULL && this != self->cleanup_ll_tail)); + this->cleanup(this); + this->is_registered_for_cleanup = false; + cleanup_trigger = this->next; + this->next = NULL; + } + + self->cleanup_ll_head = NULL; + self->cleanup_ll_tail = NULL; +} + +void Scheduler_run_timestep(Scheduler *self) { + while (!self->reaction_queue.empty(&self->reaction_queue)) { + Reaction *reaction = self->reaction_queue.pop(&self->reaction_queue); + assert(reaction); + LF_DEBUG(SCHED, "Executing %s->reaction_%d", reaction->parent->name, reaction->index); + reaction->body(reaction); + } +} + +void Scheduler_terminate(Scheduler *self) { + LF_INFO(SCHED, "Scheduler terminating"); + Environment *env = self->env; + self->prepare_timestep(self, self->stop_tag); + + env->leave_critical_section(env); + + Trigger *shutdown = &self->env->shutdown->super; + if (shutdown) { + Scheduler_prepare_builtin(shutdown); + self->run_timestep(self); + self->clean_up_timestep(self); + } +} + void Scheduler_run(Scheduler *self) { Environment *env = self->env; - int res = 0; + lf_ret_t res = 0; bool do_shutdown = false; - bool keep_alive = env->keep_alive || env->has_async_events; - LF_INFO(SCHED, "Scheduler running with keep_alive=%d has_async_events=%d", keep_alive, env->has_async_events); + bool non_terminating = self->keep_alive || env->has_async_events; + LF_INFO(SCHED, "Scheduler running with non_terminating=%d has_async_events=%d", non_terminating, + env->has_async_events); - if (env->has_async_events) { - env->platform->enter_critical_section(env->platform); - } + env->enter_critical_section(env); - while (keep_alive || !self->event_queue.empty(&self->event_queue)) { + while (non_terminating || !self->event_queue.empty(&self->event_queue)) { tag_t next_tag = self->event_queue.next_tag(&self->event_queue); LF_DEBUG(SCHED, "Next event is at %" PRId64 ":%" PRIu32, next_tag.time, next_tag.microstep); - if (lf_tag_compare(next_tag, self->env->stop_tag) > 0) { - LF_INFO(SCHED, "Next event is beyond stop tag: %" PRId64 ":%" PRIu32, self->env->stop_tag.time, - self->env->stop_tag.microstep); - next_tag = self->env->stop_tag; + + if (lf_tag_compare(next_tag, self->stop_tag) > 0) { + LF_INFO(SCHED, "Next event is beyond stop tag: %" PRId64 ":%" PRIu32, self->stop_tag.time, + self->stop_tag.microstep); + next_tag = self->stop_tag; do_shutdown = true; } else { do_shutdown = false; @@ -170,57 +175,53 @@ void Scheduler_run(Scheduler *self) { LF_DEBUG(SCHED, "Sleep interrupted before completion"); continue; } else if (res != LF_OK) { - validate(false); + throw("Sleep failed"); } - // Acquire tag - res = Scheduler_acquire_tag(self, next_tag); + // For federated execution, acquire next_tag before proceeding. This function + // might sleep and will return LF_SLEEP_INTERRUPTED if sleep was interrupted. + res = Scheduler_federated_acquire_tag(self, next_tag); if (res == LF_SLEEP_INTERRUPTED) { LF_DEBUG(SCHED, "Sleep interrupted while waiting for STAA"); continue; } + // Once we are here, we have are committed to executing `next_tag`. if (do_shutdown) { break; } self->prepare_timestep(self, next_tag); - Scheduler_pop_events(self, next_tag); + Scheduler_pop_events_and_prepare(self, next_tag); LF_DEBUG(SCHED, "Acquired tag %" PRId64 ":%" PRIu32, next_tag.time, next_tag.microstep); - // TODO: The critical section could be smaller. - if (env->has_async_events) { - env->platform->leave_critical_section(env->platform); - } + env->leave_critical_section(env); self->run_timestep(self); self->clean_up_timestep(self); - if (env->has_async_events) { - env->platform->enter_critical_section(env->platform); - } + env->enter_critical_section(env); } self->terminate(self); } lf_ret_t Scheduler_schedule_at_locked(Scheduler *self, Trigger *trigger, tag_t tag) { - Environment *env = self->env; Event event = {.tag = tag, .trigger = trigger}; // Check if we are trying to schedule past stop tag - if (lf_tag_compare(tag, env->stop_tag) > 0) { + if (lf_tag_compare(tag, self->stop_tag) > 0) { LF_WARN(SCHED, "Trying to schedule trigger %p at tag %" PRId64 ":%" PRIu32 " past stop tag %" PRId64 ":%" PRIu32, - trigger, tag.time, tag.microstep, env->stop_tag.time, env->stop_tag.microstep); + trigger, tag.time, tag.microstep, self->stop_tag.time, self->stop_tag.microstep); return LF_AFTER_STOP_TAG; } // Check if we are tring to schedule into the past - if (lf_tag_compare(tag, env->current_tag) <= 0) { + if (lf_tag_compare(tag, self->current_tag) <= 0) { LF_WARN(SCHED, "Trying to schedule trigger %p at tag %" PRId64 ":%" PRIu32 " which is before current tag %" PRId64 ":%" PRIu32, - trigger, tag.time, tag.microstep, env->current_tag.time, env->current_tag.microstep); + trigger, tag.time, tag.microstep, self->current_tag.time, self->current_tag.microstep); return LF_PAST_TAG; } @@ -235,18 +236,20 @@ lf_ret_t Scheduler_schedule_at_locked(Scheduler *self, Trigger *trigger, tag_t t lf_ret_t Scheduler_schedule_at(Scheduler *self, Trigger *trigger, tag_t tag) { Environment *env = self->env; - if (env->has_async_events) { - env->platform->enter_critical_section(env->platform); - } + env->enter_critical_section(env); int res = self->schedule_at_locked(self, trigger, tag); - if (env->has_async_events) { - env->platform->leave_critical_section(env->platform); - } + env->leave_critical_section(env); + return res; } +void Scheduler_set_timeout(Scheduler *self, interval_t duration) { + self->stop_tag.microstep = 0; + self->stop_tag.time = self->start_time + duration; +} + void Scheduler_ctor(Scheduler *self, Environment *env) { self->env = env; self->run = Scheduler_run; @@ -257,9 +260,17 @@ void Scheduler_ctor(Scheduler *self, Environment *env) { self->schedule_at = Scheduler_schedule_at; self->schedule_at_locked = Scheduler_schedule_at_locked; self->register_for_cleanup = Scheduler_register_for_cleanup; - self->executing_tag = false; + self->set_timeout = Scheduler_set_timeout; + self->keep_alive = false; + self->stop_tag = FOREVER_TAG; + self->current_tag = NEVER_TAG; self->cleanup_ll_head = NULL; self->cleanup_ll_tail = NULL; EventQueue_ctor(&self->event_queue); ReactionQueue_ctor(&self->reaction_queue); + + // Set start time + // FIXMEi: This must be resolved in the federation. Currently set start tag to nearest second. + self->start_time = ((self->env->platform->get_physical_time(self->env->platform) + SEC(1)) / SEC(1)) * SEC(1); + LF_INFO(ENV, "Start time: %" PRId64, self->start_time); } diff --git a/src/tag.c b/src/tag.c index 2a1eccb9..e69d8226 100644 --- a/src/tag.c +++ b/src/tag.c @@ -11,18 +11,6 @@ #include "reactor-uc/tag.h" #include "reactor-uc/environment.h" -/** - * An enum for specifying the desired tag when calling "lf_time" - */ -typedef enum { LF_LOGICAL, LF_PHYSICAL, LF_ELAPSED_LOGICAL, LF_ELAPSED_PHYSICAL, LF_START } lf_time_type; - -//////////////// Functions declared in tag.h - -tag_t lf_tag(void *env) { - (void)env; - return ((Environment *)env)->current_tag; -} - instant_t lf_time_add(instant_t time, interval_t interval) { if (time == NEVER || interval == NEVER) { return NEVER; diff --git a/src/timer.c b/src/timer.c index 75aa1b89..ac571c81 100644 --- a/src/timer.c +++ b/src/timer.c @@ -24,7 +24,7 @@ void Timer_cleanup(Trigger *_self) { // Schedule next event unless it is a single-shot timer. if (self->period > NEVER) { - tag_t next_tag = lf_delay_tag(env->current_tag, self->period); + tag_t next_tag = lf_delay_tag(sched->current_tag, self->period); sched->schedule_at(sched, _self, next_tag); } } @@ -41,6 +41,6 @@ void Timer_ctor(Timer *self, Reactor *parent, instant_t offset, interval_t perio // Schedule first Scheduler *sched = &self->super.parent->env->scheduler; - tag_t tag = {.microstep = 0, .time = offset + self->super.parent->env->start_time}; + tag_t tag = {.microstep = 0, .time = offset + sched->start_time}; sched->schedule_at(sched, &self->super, tag); } diff --git a/src/trigger.c b/src/trigger.c index ca13efe1..50200d49 100644 --- a/src/trigger.c +++ b/src/trigger.c @@ -5,8 +5,8 @@ #include const void *Trigger_get(Trigger *self) { - if (self->trigger_value) { - TriggerValue *tval = self->trigger_value; + if (self->trigger_data_queue) { + TriggerDataQueue *tval = self->trigger_data_queue; return &tval->buffer[tval->read_idx * tval->value_size]; } else { assert(false); @@ -14,7 +14,7 @@ const void *Trigger_get(Trigger *self) { } } -void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, TriggerValue *trigger_value, +void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, TriggerDataQueue *trigger_data_queue, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), const void *(*get)(Trigger *)) { self->type = type; self->parent = parent; @@ -23,7 +23,7 @@ void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, TriggerValue self->is_registered_for_cleanup = false; self->prepare = prepare; self->cleanup = cleanup; - self->trigger_value = trigger_value; + self->trigger_data_queue = trigger_data_queue; if (get) { self->get = get; } else { diff --git a/src/trigger_value.c b/src/trigger_data_queue.c similarity index 59% rename from src/trigger_value.c rename to src/trigger_data_queue.c index 9e1fe45a..f57d3f52 100644 --- a/src/trigger_value.c +++ b/src/trigger_data_queue.c @@ -1,12 +1,12 @@ -#include "reactor-uc/trigger_value.h" +#include "reactor-uc/trigger_data_queue.h" #include "reactor-uc/logging.h" #include #include #include -lf_ret_t TriggerValue_stage(TriggerValue *self, const void *value) { +lf_ret_t TriggerDataQueue_stage(TriggerDataQueue *self, const void *value) { if (!self->empty && self->read_idx == self->write_idx) { - LF_ERR(TRIG, "Could not stage value, TriggerValue %p is full", self); + LF_ERR(TRIG, "Could not stage value, TriggerDataQueue %p is full", self); return LF_OUT_OF_BOUNDS; } memcpy(self->buffer + self->write_idx * self->value_size, value, self->value_size); // NOLINT @@ -14,9 +14,9 @@ lf_ret_t TriggerValue_stage(TriggerValue *self, const void *value) { return LF_OK; } -lf_ret_t TriggerValue_push(TriggerValue *self) { +lf_ret_t TriggerDataQueue_push(TriggerDataQueue *self) { if (!self->staged) { - LF_ERR(TRIG, "Could not push value, no value staged in TriggerValue %p", self); + LF_ERR(TRIG, "Could not push value, no value staged in TriggerDataQueue %p", self); return LF_INVALID_VALUE; } @@ -27,9 +27,9 @@ lf_ret_t TriggerValue_push(TriggerValue *self) { return 0; } -lf_ret_t TriggerValue_pop(TriggerValue *self) { +lf_ret_t TriggerDataQueue_pop(TriggerDataQueue *self) { if (self->empty) { - LF_ERR(TRIG, "Could not pop value, TriggerValue %p is empty", self); + LF_ERR(TRIG, "Could not pop value, TriggerDataQueue %p is empty", self); return LF_EMPTY; } @@ -41,7 +41,7 @@ lf_ret_t TriggerValue_pop(TriggerValue *self) { return LF_OK; } -void TriggerValue_ctor(TriggerValue *self, char *buffer, size_t value_size, size_t capacity) { +void TriggerDataQueue_ctor(TriggerDataQueue *self, char *buffer, size_t value_size, size_t capacity) { self->buffer = buffer; self->value_size = value_size; self->capacity = capacity; @@ -49,7 +49,7 @@ void TriggerValue_ctor(TriggerValue *self, char *buffer, size_t value_size, size self->write_idx = 0; self->empty = true; self->staged = true; - self->push = TriggerValue_push; - self->pop = TriggerValue_pop; - self->stage = TriggerValue_stage; + self->push = TriggerDataQueue_push; + self->pop = TriggerDataQueue_pop; + self->stage = TriggerDataQueue_stage; } diff --git a/test/unit/action_microstep_test.c b/test/unit/action_microstep_test.c index a1e64496..32e297dd 100644 --- a/test/unit/action_microstep_test.c +++ b/test/unit/action_microstep_test.c @@ -55,7 +55,7 @@ void action_handler(Reaction *_self) { printf("Action = %d\n", lf_get(my_action)); if (self->cnt > 0) { TEST_ASSERT_EQUAL(self->cnt, lf_get(my_action)); - TEST_ASSERT_EQUAL(self->cnt, env->current_tag.microstep); + TEST_ASSERT_EQUAL(self->cnt, env->scheduler.current_tag.microstep); TEST_ASSERT_EQUAL(true, lf_is_present(my_action)); } else { TEST_ASSERT_EQUAL(false, lf_is_present(my_action)); @@ -91,7 +91,6 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); - env.set_timeout(&env, SEC(1)); env.assemble(&env); env.start(&env); } diff --git a/test/unit/action_test.c b/test/unit/action_test.c index 2d6786ef..e6a0796e 100644 --- a/test/unit/action_test.c +++ b/test/unit/action_test.c @@ -82,7 +82,7 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); - env.set_timeout(&env, SEC(1)); + env.scheduler.set_timeout(&env.scheduler, SEC(1)); env.assemble(&env); env.start(&env); } diff --git a/test/unit/delayed_conn_test.c b/test/unit/delayed_conn_test.c index 03c1dbb9..9295d666 100644 --- a/test/unit/delayed_conn_test.c +++ b/test/unit/delayed_conn_test.c @@ -145,7 +145,7 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&main); Main_ctor(&main, &env); - env.set_timeout(&env, SEC(1)); + env.scheduler.set_timeout(&env.scheduler, SEC(1)); env.assemble(&env); env.start(&env); } diff --git a/test/unit/physical_action_test.c b/test/unit/physical_action_test.c index 30080241..e1b01b29 100644 --- a/test/unit/physical_action_test.c +++ b/test/unit/physical_action_test.c @@ -135,7 +135,7 @@ void test_simple() { struct MyReactor my_reactor; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); - env.set_timeout(&env, SEC(1)); + env.scheduler.set_timeout(&env.scheduler, SEC(1)); env.assemble(&env); env.start(&env); } diff --git a/test/unit/port_test.c b/test/unit/port_test.c index a67bb779..38f4984b 100644 --- a/test/unit/port_test.c +++ b/test/unit/port_test.c @@ -139,7 +139,7 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&main); Main_ctor(&main, &env); - env.set_timeout(&env, SEC(1)); + env.scheduler.set_timeout(&env.scheduler, SEC(1)); env.assemble(&env); env.start(&env); } diff --git a/test/unit/timer_test.c b/test/unit/timer_test.c index 278e9c5b..9e0acdb7 100644 --- a/test/unit/timer_test.c +++ b/test/unit/timer_test.c @@ -20,7 +20,8 @@ struct MyReactor { void timer_handler(Reaction *_self) { struct MyReactor *self = (struct MyReactor *)_self->parent; - printf("Hello World @ %ld\n", self->super.env->current_tag.time); + Environment *env = self->super.env; + printf("Hello World @ %ld\n", env->get_elapsed_logical_time(env)); } void MyReaction_ctor(MyReaction *self, Reactor *parent) { @@ -40,7 +41,7 @@ void test_simple() { struct MyReactor my_reactor; Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); - env.set_timeout(&env, SEC(1)); + env.scheduler.set_timeout(&env.scheduler, SEC(1)); MyReactor_ctor(&my_reactor, &env); env.assemble(&env); env.start(&env); diff --git a/test/unit/trigger_value_test.c b/test/unit/trigger_data_queue_test.c similarity index 79% rename from test/unit/trigger_value_test.c rename to test/unit/trigger_data_queue_test.c index 267876cb..46bd6a07 100644 --- a/test/unit/trigger_value_test.c +++ b/test/unit/trigger_data_queue_test.c @@ -1,13 +1,13 @@ #include "unity.h" -#include "reactor-uc/trigger_value.h" +#include "reactor-uc/trigger_data_queue.h" // Verify that we can push a bunch of values and then pop them off void test_push_pop(void) { - TriggerValue t; + TriggerDataQueue t; int buffer[10]; - TriggerValue_ctor(&t, (char *)&buffer, sizeof(int), 10); + TriggerDataQueue_ctor(&t, (char *)&buffer, sizeof(int), 10); for (int j = 0; j < 3; j++) { int val = 1; @@ -27,9 +27,9 @@ void test_push_pop(void) { void test_pop_empty(void) { - TriggerValue t; + TriggerDataQueue t; int buffer[10]; - TriggerValue_ctor(&t, (char *)&buffer, sizeof(int), 10); + TriggerDataQueue_ctor(&t, (char *)&buffer, sizeof(int), 10); int val = 2; t.stage(&t, (const void *)&val); t.push(&t);