From 6eb74b1a66a20e5b373e8b8f8d6dd06a0493bede Mon Sep 17 00:00:00 2001 From: erling Date: Fri, 25 Oct 2024 14:57:11 -0700 Subject: [PATCH] Distribute the start tag within the federation (#81) * Update proto messages and have a start_tag_signal that is handled and used to distribute start tags * Fix examples * Refactor a little * Formatting * Only schedule timers and startup after start tag has been acquired * Remove unused proto messages * Fix up tests * Correctly initialize start_time * Some refactoring * Formatting * Fix pico * Try porting tcp_channel_test to new FederateMessage * Final portings --- .clang-format | 3 +- Makefile | 7 +- examples/pico/CMakeLists.txt | 5 +- examples/pico/blinky.c | 87 +++++++++++++++++++ examples/pico/timer_ex.c | 47 ---------- examples/posix/testing_fed_conn_receiver.c | 2 +- examples/posix/testing_fed_conn_sender.c | 2 +- examples/posix/timer_ex.c | 2 +- .../federated_receiver1/src/receiver.c | 2 +- .../federated_receiver2/src/receiver.c | 2 +- .../federated_sender/src/sender.c | 2 +- external/proto/message.pb.c | 8 ++ external/proto/message.pb.h | 61 ++++++++++++- external/proto/message.proto | 12 +++ include/reactor-uc/encoding.h | 8 +- include/reactor-uc/federated.h | 2 + include/reactor-uc/logging.h | 1 + include/reactor-uc/macros.h | 9 +- include/reactor-uc/network_channel.h | 5 +- .../platform/posix/tcp_ip_channel.h | 4 +- include/reactor-uc/scheduler.h | 9 +- .../lflang/generator/uc/UcMainGenerator.kt | 2 +- src/environment.c | 13 +-- src/federated.c | 78 ++++++++++++++--- src/platform/pico/pico.c | 2 +- src/platform/posix/tcp_ip_channel.c | 8 +- src/reactor.c | 7 -- src/scheduler.c | 66 ++++++++++++-- src/timer.c | 7 -- test/lf/src/Parameters.lf | 1 + test/unit/action_test.c | 2 +- test/unit/delayed_conn_test.c | 2 +- test/unit/nanopb_test.c | 24 +++-- test/unit/physical_action_test.c | 2 +- test/unit/port_test.c | 2 +- test/unit/request_shutdown_test.c | 2 +- test/unit/tcp_channel_test.c | 34 +++++--- test/unit/timer_test.c | 2 +- 38 files changed, 375 insertions(+), 159 deletions(-) create mode 100644 examples/pico/blinky.c delete mode 100644 examples/pico/timer_ex.c diff --git a/.clang-format b/.clang-format index e44df291..081ce3c8 100644 --- a/.clang-format +++ b/.clang-format @@ -3,4 +3,5 @@ BasedOnStyle: LLVM Language: Cpp IndentWidth: 2 ColumnLimit: 120 -AllowShortIfStatementsOnASingleLine: false \ No newline at end of file +AllowShortIfStatementsOnASingleLine: false +SortIncludes: false \ No newline at end of file diff --git a/Makefile b/Makefile index 1843a9f9..99e7d4d9 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,10 @@ .PHONY: clean test coverage asan format format-check ci lf-test lib proto examples -test: unit-test lf-test examples - +test: unit-test examples # Generate protobuf code proto: - python external/nanopb/generator/nanopb_generator.py -Iexternal/nanopb/generator/proto/ -Iexternal/proto -L'#include "nanopb/%s"' -Dexternal/proto message.proto + python3 external/nanopb/generator/nanopb_generator.py -Iexternal/nanopb/generator/proto/ -Iexternal/proto -L'#include "nanopb/%s"' -Dexternal/proto message.proto # Build reactor-uc as a static library lib: @@ -13,14 +12,12 @@ lib: cmake --build build make -C build - # Build examples examples: cmake -Bbuild -DBUILD_EXAMPLES=ON . cmake --build build make examples -C build - # Build and run the unit tests unit-test: cmake -Bbuild -DBUILD_TESTS=ON diff --git a/examples/pico/CMakeLists.txt b/examples/pico/CMakeLists.txt index d34e3440..15271fab 100644 --- a/examples/pico/CMakeLists.txt +++ b/examples/pico/CMakeLists.txt @@ -12,8 +12,7 @@ project(reactor-uc-pico) pico_sdk_init() add_subdirectory(../../ reactor-uc) -add_executable(hello_world - timer_ex.c +add_executable(blinky blinky.c ) -target_link_libraries(hello_world PRIVATE reactor-uc) \ No newline at end of file +target_link_libraries(blinky PRIVATE reactor-uc) \ No newline at end of file diff --git a/examples/pico/blinky.c b/examples/pico/blinky.c new file mode 100644 index 00000000..d47614c4 --- /dev/null +++ b/examples/pico/blinky.c @@ -0,0 +1,87 @@ +#include "pico/stdlib.h" +#include "reactor-uc/reactor-uc.h" + +#ifdef CYW43_WL_GPIO_LED_PIN +#include "pico/cyw43_arch.h" +#endif + +// Perform initialisation +int pico_led_init(void) { +#if defined(PICO_DEFAULT_LED_PIN) + // A device like Pico that uses a GPIO for the LED will define PICO_DEFAULT_LED_PIN + // so we can use normal GPIO functionality to turn the led on and off + gpio_init(PICO_DEFAULT_LED_PIN); + gpio_set_dir(PICO_DEFAULT_LED_PIN, GPIO_OUT); + return PICO_OK; +#elif defined(CYW43_WL_GPIO_LED_PIN) + // For Pico W devices we need to initialise the driver etc + return cyw43_arch_init(); +#else +#error "No LED pin defined for Pico" +#endif +} + +// Turn the led on or off +void pico_set_led(bool led_on) { +#if defined(PICO_DEFAULT_LED_PIN) + // Just set the GPIO on or off + gpio_put(PICO_DEFAULT_LED_PIN, led_on); +#elif defined(CYW43_WL_GPIO_LED_PIN) + // Ask the wifi "driver" to set the GPIO on or off + cyw43_arch_gpio_put(CYW43_WL_GPIO_LED_PIN, led_on); +#else +#error "No LED pin defined for Pico" +#endif +} + +typedef struct { + Timer super; + Reaction *effects[0]; +} MyTimer; + +typedef struct { + Reaction super; +} MyReaction; + +struct MyReactor { + Reactor super; + MyReaction my_reaction; + MyTimer timer; + bool led_on; + Reaction *_reactions[1]; + Trigger *_triggers[1]; +}; + +void timer_handler(Reaction *_self) { + struct MyReactor *self = (struct MyReactor *)_self->parent; + Environment *env = self->super.env; + printf("Hello World @ %lld\n", env->get_elapsed_logical_time(env)); + pico_set_led(!self->led_on); + self->led_on = !self->led_on; +} + +void MyReaction_ctor(MyReaction *self, Reactor *parent) { + Reaction_ctor(&self->super, parent, timer_handler, NULL, 0, 0); +} + +void MyReactor_ctor(struct MyReactor *self, Environment *env) { + self->_reactions[0] = (Reaction *)&self->my_reaction; + self->_triggers[0] = (Trigger *)&self->timer; + Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 1); + MyReaction_ctor(&self->my_reaction, &self->super); + Timer_ctor(&self->timer.super, &self->super, MSEC(0), MSEC(100), self->timer.effects, 1); + TIMER_REGISTER_EFFECT(self->timer, self->my_reaction); + self->led_on = false; +} + +struct MyReactor my_reactor; +Environment env; +int main() { + Environment_ctor(&env, (Reactor *)&my_reactor); + pico_led_init(); + env.scheduler.duration = FOREVER; + MyReactor_ctor(&my_reactor, &env); + env.assemble(&env); + env.start(&env); + return 0; +} diff --git a/examples/pico/timer_ex.c b/examples/pico/timer_ex.c deleted file mode 100644 index ece559bd..00000000 --- a/examples/pico/timer_ex.c +++ /dev/null @@ -1,47 +0,0 @@ -#include "reactor-uc/reactor-uc.h" - -typedef struct { - Timer super; - Reaction *effects[0]; -} MyTimer; - -typedef struct { - Reaction super; -} MyReaction; - -struct MyReactor { - Reactor super; - MyReaction my_reaction; - MyTimer timer; - Reaction *_reactions[1]; - Trigger *_triggers[1]; -}; - -void timer_handler(Reaction *_self) { - struct MyReactor *self = (struct MyReactor *)_self->parent; - printf("Hello World @ %lld\n", self->super.env->current_tag.time); -} - -void MyReaction_ctor(MyReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, timer_handler, NULL, 0, 0); -} - -void MyReactor_ctor(struct MyReactor *self, Environment *env) { - self->_reactions[0] = (Reaction *)&self->my_reaction; - self->_triggers[0] = (Trigger *)&self->timer; - Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 1); - MyReaction_ctor(&self->my_reaction, &self->super); - Timer_ctor(&self->timer.super, &self->super, MSEC(0), MSEC(100), self->timer.effects, 1); - TIMER_REGISTER_EFFECT(self->timer, self->my_reaction); -} - -struct MyReactor my_reactor; -Environment env; -int main() { - Environment_ctor(&env, (Reactor *)&my_reactor); - env.set_timeout(&env, SEC(1)); - MyReactor_ctor(&my_reactor, &env); - env.assemble(&env); - env.start(&env); - return 0; -} diff --git a/examples/posix/testing_fed_conn_receiver.c b/examples/posix/testing_fed_conn_receiver.c index a155b7dc..bb11668d 100644 --- a/examples/posix/testing_fed_conn_receiver.c +++ b/examples/posix/testing_fed_conn_receiver.c @@ -101,7 +101,7 @@ void MainRecv_ctor(MainRecv *self, Environment *env) { Reactor_ctor(&self->super, "MainRecv", env, NULL, self->_children, 1, NULL, 0, NULL, 0); } -ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1) +ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1, false) int main() { lf_start(); diff --git a/examples/posix/testing_fed_conn_sender.c b/examples/posix/testing_fed_conn_sender.c index e8da205e..9be199f2 100644 --- a/examples/posix/testing_fed_conn_sender.c +++ b/examples/posix/testing_fed_conn_sender.c @@ -153,7 +153,7 @@ void MainSender_ctor(MainSender *self, Environment *env) { CONN_REGISTER_UPSTREAM(self->bundle.conn, self->sender.out); Reactor_ctor(&self->super, "MainSender", env, NULL, self->_children, 1, NULL, 0, NULL, 0); } -ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1) +ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1, true) int main() { lf_start(); diff --git a/examples/posix/timer_ex.c b/examples/posix/timer_ex.c index 97989046..fd300d47 100644 --- a/examples/posix/timer_ex.c +++ b/examples/posix/timer_ex.c @@ -46,7 +46,7 @@ int main() { struct MyReactor my_reactor; Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); - env.scheduler.set_timeout(&env.scheduler, SEC(1)); + env.scheduler.duration = MSEC(100); MyReactor_ctor(&my_reactor, &env); env.assemble(&env); env.start(&env); diff --git a/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c b/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c index b2f0f80c..ab0fae7e 100644 --- a/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c +++ b/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c @@ -100,7 +100,7 @@ void MainRecv_ctor(MainRecv *self, Environment *env) { self->_bundles[0] = &self->bundle.super; } -ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1) +ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1, false) int main() { setup_led(); diff --git a/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c b/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c index 51230ae8..5ffaafe1 100644 --- a/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c +++ b/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c @@ -100,7 +100,7 @@ void MainRecv_ctor(MainRecv *self, Environment *env) { self->_bundles[0] = &self->bundle.super; } -ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1) +ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1, false) int main() { setup_led(); diff --git a/examples/zephyr/basic_federated/federated_sender/src/sender.c b/examples/zephyr/basic_federated/federated_sender/src/sender.c index 6fcb5cd7..1d1bbb39 100644 --- a/examples/zephyr/basic_federated/federated_sender/src/sender.c +++ b/examples/zephyr/basic_federated/federated_sender/src/sender.c @@ -180,7 +180,7 @@ void MainSender_ctor(MainSender *self, Environment *env) { self->_bundles[1] = &self->bundle2.super; } -ENTRY_POINT_FEDERATED(MainSender, FOREVER, true, true, 2) +ENTRY_POINT_FEDERATED(MainSender, FOREVER, true, true, 2, true) int main() { setup_button(); diff --git a/external/proto/message.pb.c b/external/proto/message.pb.c index 00ca9eb5..04639d2b 100644 --- a/external/proto/message.pb.c +++ b/external/proto/message.pb.c @@ -9,7 +9,15 @@ PB_BIND(Tag, Tag, AUTO) +PB_BIND(StartTagSignal, StartTagSignal, AUTO) + + PB_BIND(TaggedMessage, TaggedMessage, 2) +PB_BIND(FederateMessage, FederateMessage, 2) + + + + diff --git a/external/proto/message.pb.h b/external/proto/message.pb.h index c449f411..90edfd05 100644 --- a/external/proto/message.pb.h +++ b/external/proto/message.pb.h @@ -9,12 +9,22 @@ #error Regenerate this file with the current version of nanopb generator. #endif +/* Enum definitions */ +typedef enum _MessageType { + MessageType_TAGGED_MESSAGE = 1, + MessageType_START_TAG_SIGNAL = 2 +} MessageType; + /* Struct definitions */ typedef struct _Tag { int64_t time; uint32_t microstep; } Tag; +typedef struct _StartTagSignal { + Tag tag; +} StartTagSignal; + typedef PB_BYTES_ARRAY_T(832) TaggedMessage_payload_t; typedef struct _TaggedMessage { Tag tag; @@ -22,23 +32,51 @@ typedef struct _TaggedMessage { TaggedMessage_payload_t payload; } TaggedMessage; +typedef struct _FederateMessage { + MessageType type; + pb_size_t which_message; + union { + TaggedMessage tagged_message; + StartTagSignal start_tag_signal; + } message; +} FederateMessage; + #ifdef __cplusplus extern "C" { #endif +/* Helper constants for enums */ +#define _MessageType_MIN MessageType_TAGGED_MESSAGE +#define _MessageType_MAX MessageType_START_TAG_SIGNAL +#define _MessageType_ARRAYSIZE ((MessageType)(MessageType_START_TAG_SIGNAL+1)) + + + + +#define FederateMessage_type_ENUMTYPE MessageType + + /* Initializer values for message structs */ #define Tag_init_default {0, 0} +#define StartTagSignal_init_default {Tag_init_default} #define TaggedMessage_init_default {Tag_init_default, 0, {0, {0}}} +#define FederateMessage_init_default {_MessageType_MIN, 0, {TaggedMessage_init_default}} #define Tag_init_zero {0, 0} +#define StartTagSignal_init_zero {Tag_init_zero} #define TaggedMessage_init_zero {Tag_init_zero, 0, {0, {0}}} +#define FederateMessage_init_zero {_MessageType_MIN, 0, {TaggedMessage_init_zero}} /* Field tags (for use in manual encoding/decoding) */ #define Tag_time_tag 1 #define Tag_microstep_tag 2 +#define StartTagSignal_tag_tag 1 #define TaggedMessage_tag_tag 1 #define TaggedMessage_conn_id_tag 2 #define TaggedMessage_payload_tag 3 +#define FederateMessage_type_tag 1 +#define FederateMessage_tagged_message_tag 2 +#define FederateMessage_start_tag_signal_tag 3 /* Struct field encoding specification for nanopb */ #define Tag_FIELDLIST(X, a) \ @@ -47,6 +85,12 @@ X(a, STATIC, REQUIRED, UINT32, microstep, 2) #define Tag_CALLBACK NULL #define Tag_DEFAULT NULL +#define StartTagSignal_FIELDLIST(X, a) \ +X(a, STATIC, REQUIRED, MESSAGE, tag, 1) +#define StartTagSignal_CALLBACK NULL +#define StartTagSignal_DEFAULT NULL +#define StartTagSignal_tag_MSGTYPE Tag + #define TaggedMessage_FIELDLIST(X, a) \ X(a, STATIC, REQUIRED, MESSAGE, tag, 1) \ X(a, STATIC, REQUIRED, INT32, conn_id, 2) \ @@ -55,15 +99,30 @@ X(a, STATIC, REQUIRED, BYTES, payload, 3) #define TaggedMessage_DEFAULT NULL #define TaggedMessage_tag_MSGTYPE Tag +#define FederateMessage_FIELDLIST(X, a) \ +X(a, STATIC, REQUIRED, UENUM, type, 1) \ +X(a, STATIC, ONEOF, MESSAGE, (message,tagged_message,message.tagged_message), 2) \ +X(a, STATIC, ONEOF, MESSAGE, (message,start_tag_signal,message.start_tag_signal), 3) +#define FederateMessage_CALLBACK NULL +#define FederateMessage_DEFAULT (const pb_byte_t*)"\x08\x01\x00" +#define FederateMessage_message_tagged_message_MSGTYPE TaggedMessage +#define FederateMessage_message_start_tag_signal_MSGTYPE StartTagSignal + extern const pb_msgdesc_t Tag_msg; +extern const pb_msgdesc_t StartTagSignal_msg; extern const pb_msgdesc_t TaggedMessage_msg; +extern const pb_msgdesc_t FederateMessage_msg; /* Defines for backwards compatibility with code written before nanopb-0.4.0 */ #define Tag_fields &Tag_msg +#define StartTagSignal_fields &StartTagSignal_msg #define TaggedMessage_fields &TaggedMessage_msg +#define FederateMessage_fields &FederateMessage_msg /* Maximum encoded size of messages (where known) */ -#define MESSAGE_PB_H_MAX_SIZE TaggedMessage_size +#define FederateMessage_size 870 +#define MESSAGE_PB_H_MAX_SIZE FederateMessage_size +#define StartTagSignal_size 19 #define Tag_size 17 #define TaggedMessage_size 865 diff --git a/external/proto/message.proto b/external/proto/message.proto index f83f40c4..473c702f 100644 --- a/external/proto/message.proto +++ b/external/proto/message.proto @@ -1,12 +1,24 @@ import "nanopb.proto"; +enum MessageType { TAGGED_MESSAGE = 1; START_TAG_SIGNAL = 2; } + message Tag { required int64 time = 1; required uint32 microstep = 2; } +message StartTagSignal { required Tag tag = 1; } + message TaggedMessage { required Tag tag = 1; required int32 conn_id = 2; required bytes payload = 3 [(nanopb).max_size = 832]; } + +message FederateMessage { + required MessageType type = 1; + oneof message { + TaggedMessage tagged_message = 2; + StartTagSignal start_tag_signal = 3; + } +} diff --git a/include/reactor-uc/encoding.h b/include/reactor-uc/encoding.h index 76fa5576..798bfe03 100644 --- a/include/reactor-uc/encoding.h +++ b/include/reactor-uc/encoding.h @@ -5,22 +5,22 @@ #include #include -int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t buffer_size) { +int encode_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size) { // turing write buffer into pb_ostream buffer pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size); // serializing protobuf into buffer - if (!pb_encode(&stream_out, TaggedMessage_fields, message)) { + if (!pb_encode(&stream_out, FederateMessage_fields, message)) { return -1; } return (int)stream_out.bytes_written; } -int decode_protobuf(TaggedMessage *message, const unsigned char *buffer, size_t buffer_size) { +int decode_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size) { pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size); - if (!pb_decode(&stream_in, TaggedMessage_fields, message)) { + if (!pb_decode(&stream_in, FederateMessage_fields, message)) { return -1; } diff --git a/include/reactor-uc/federated.h b/include/reactor-uc/federated.h index 7b10f3cf..258fd031 100644 --- a/include/reactor-uc/federated.h +++ b/include/reactor-uc/federated.h @@ -67,4 +67,6 @@ struct FederatedInputConnection { void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *parent, interval_t delay, ConnectionType type, Port **downstreams, size_t downstreams_size, void *payload_buf, bool *payload_used_buf, size_t payload_size, size_t payload_buf_capacity); + +void Federated_distribute_start_tag(Environment *env, instant_t start_time); #endif \ No newline at end of file diff --git a/include/reactor-uc/logging.h b/include/reactor-uc/logging.h index 2101cd0a..9ea8d7ba 100644 --- a/include/reactor-uc/logging.h +++ b/include/reactor-uc/logging.h @@ -1,5 +1,6 @@ #ifndef REACTOR_UC_LOGGING_H #define REACTOR_UC_LOGGING_H +#include #include // The different verbosity levels supported diff --git a/include/reactor-uc/macros.h b/include/reactor-uc/macros.h index a036e7e6..fc9307e5 100644 --- a/include/reactor-uc/macros.h +++ b/include/reactor-uc/macros.h @@ -261,21 +261,22 @@ typedef struct FederatedInputConnection FederatedInputConnection; void lf_start() { \ Environment_ctor(&env, (Reactor *)&main_reactor); \ MainReactorName##_ctor(&main_reactor, &env); \ - env.scheduler.set_timeout(&env.scheduler, Timeout); \ - env.assemble(&env); \ + env.scheduler.duration = Timeout; \ env.scheduler.keep_alive = KeepAlive; \ + env.assemble(&env); \ env.start(&env); \ lf_exit(); \ } -#define ENTRY_POINT_FEDERATED(FederateName, Timeout, KeepAlive, HasInputs, NumBundles) \ +#define ENTRY_POINT_FEDERATED(FederateName, Timeout, KeepAlive, HasInputs, NumBundles, IsLeader) \ FederateName main_reactor; \ Environment env; \ void lf_exit(void) { Environment_free(&env); } \ void lf_start() { \ Environment_ctor(&env, (Reactor *)&main_reactor); \ - env.scheduler.set_timeout(&env.scheduler, Timeout); \ + env.scheduler.duration = Timeout; \ env.scheduler.keep_alive = KeepAlive; \ + env.scheduler.leader = IsLeader; \ env.has_async_events = HasInputs; \ env.enter_critical_section(&env); \ FederateName##_ctor(&main_reactor, &env); \ diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index b3e5037e..153b822c 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -17,9 +17,10 @@ struct NetworkChannel { lf_ret_t (*open_connection)(NetworkChannel *self); lf_ret_t (*try_connect)(NetworkChannel *self); void (*close_connection)(NetworkChannel *self); - lf_ret_t (*send_blocking)(NetworkChannel *self, TaggedMessage *message); + lf_ret_t (*send_blocking)(NetworkChannel *self, const FederateMessage *message); void (*register_receive_callback)(NetworkChannel *self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), + void (*receive_callback)(FederatedConnectionBundle *conn, + const FederateMessage *message), FederatedConnectionBundle *conn); void (*free)(NetworkChannel *self); }; diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index 681482f2..74a2cf6d 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -26,7 +26,7 @@ struct TcpIpChannel { unsigned short port; int protocol_family; - TaggedMessage output; + FederateMessage output; unsigned char write_buffer[TCP_IP_CHANNEL_BUFFERSIZE]; unsigned char read_buffer[TCP_IP_CHANNEL_BUFFERSIZE]; unsigned int read_index; @@ -41,7 +41,7 @@ struct TcpIpChannel { char receive_thread_stack[TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE]; FederatedConnectionBundle *federated_connection; - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message); + void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message); }; void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family, bool server); diff --git a/include/reactor-uc/scheduler.h b/include/reactor-uc/scheduler.h index e6ca8392..ded4566c 100644 --- a/include/reactor-uc/scheduler.h +++ b/include/reactor-uc/scheduler.h @@ -15,7 +15,9 @@ struct Scheduler { // that are registered for cleanup at the end of the current tag. Trigger *cleanup_ll_head; Trigger *cleanup_ll_tail; + bool leader; // Whether this scheduler is the leader in a federated program and selects the start tag. instant_t start_time; // The physical time at which the program started. + interval_t duration; // The duration after which the program should stop. tag_t stop_tag; // The tag at which the program should stop. This is set by the user or by the scheduler. tag_t current_tag; // The current logical tag. Set by the scheduler and read by user in the reaction bodies. bool keep_alive; // Whether the program should keep running even if there are no more events to process. @@ -64,16 +66,13 @@ struct Scheduler { void (*request_shutdown)(Scheduler *self); - /** - * @brief Set the stop tag of the program based on a timeout duration. - */ - void (*set_timeout)(Scheduler *self, interval_t duration); - /** * @brief Register Trigger for cleanup. The cleanup function of the trigger * will be called in `clean_up_timestep`. */ void (*register_for_cleanup)(Scheduler *self, Trigger *trigger); + + void (*acquire_and_schedule_start_tag)(Scheduler *self); }; void Scheduler_ctor(Scheduler *self, Environment *env); diff --git a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt index 268ed5c7..d7104e77 100644 --- a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt +++ b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcMainGenerator.kt @@ -43,7 +43,7 @@ class UcMainGenerator( |static ${main.codeType} main_reactor; |void lf_main(void) { | Environment_ctor(&env, &main_reactor.super); - | ${if (targetConfig.isSet(TimeOutProperty.INSTANCE)) "env.scheduler.set_timeout(&env.scheduler, ${targetConfig.get(TimeOutProperty.INSTANCE).toCCode()});" else ""} + | ${if (targetConfig.isSet(TimeOutProperty.INSTANCE)) "env.scheduler.set_duration(&env.scheduler, ${targetConfig.get(TimeOutProperty.INSTANCE).toCCode()});" else ""} | ${main.codeType}_ctor(&main_reactor, &env, NULL); | env.assemble(&env); | env.start(&env); diff --git a/src/environment.c b/src/environment.c index 6f82e97a..59067dd0 100644 --- a/src/environment.c +++ b/src/environment.c @@ -8,19 +8,8 @@ void Environment_assemble(Environment *self) { validaten(self->main->calculate_levels(self->main)); } -// Find the start time of the federation. -// FIXME: This needs to involve communcation with the other federates. Currently hardcoded to 1 second. -static void Environment_set_start_time(Environment *self) { -#if defined(PLATFORM_POSIX) - self->scheduler.start_time = ((self->platform->get_physical_time(self->platform) + SEC(1)) / SEC(1)) * SEC(1); -#else - self->scheduler.start_time = SEC(1); -#endif - LF_INFO(ENV, "Start time: %" PRId64, self->scheduler.start_time); -} - void Environment_start(Environment *self) { - Environment_set_start_time(self); + self->scheduler.acquire_and_schedule_start_tag(&self->scheduler); self->scheduler.run(&self->scheduler); } diff --git a/src/federated.c b/src/federated.c index 2e34f8c5..724575c9 100644 --- a/src/federated.c +++ b/src/federated.c @@ -46,17 +46,21 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { assert(trigger->is_registered_for_cleanup); assert(trigger->is_present == false); - TaggedMessage msg; - msg.conn_id = self->conn_id; - msg.tag.time = sched->current_tag.time; - msg.tag.microstep = sched->current_tag.microstep; + FederateMessage msg; + msg.type = MessageType_TAGGED_MESSAGE; + msg.which_message = FederateMessage_tagged_message_tag; + + TaggedMessage *tagged_msg = &msg.message.tagged_message; + tagged_msg->conn_id = self->conn_id; + tagged_msg->tag.time = sched->current_tag.time; + tagged_msg->tag.microstep = sched->current_tag.microstep; size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size, - msg.payload.bytes); - msg.payload.size = msg_size; + tagged_msg->payload.bytes); + tagged_msg->payload.size = msg_size; - LF_DEBUG(FED, "FedOutConn %p sending message with tag=%" PRId64 ":%" PRIu32, trigger, msg.tag.time, - msg.tag.microstep); + LF_DEBUG(FED, "FedOutConn %p sending tagged message with tag=%" PRId64 ":%" PRIu32, trigger, tagged_msg->tag.time, + tagged_msg->tag.microstep); lf_ret_t ret = channel->send_blocking(channel, &msg); if (ret != LF_OK) { @@ -114,9 +118,36 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare self->safe_to_assume_absent = FOREVER; } +void FederatedConnectionBundle_handle_start_tag_signal(FederatedConnectionBundle *self, const FederateMessage *_msg) { + const StartTagSignal *msg = &_msg->message.start_tag_signal; + LF_DEBUG(FED, "Received start tag signal with tag=%" PRId64 ":%" PRIu32, msg->tag.time, msg->tag.microstep); + Environment *env = self->parent->env; + Scheduler *sched = &env->scheduler; + env->platform->enter_critical_section(env->platform); + + if (sched->start_time == NEVER) { + LF_DEBUG(FED, "First time receiving star tag. Setting tag and sending to other federates"); + sched->start_time = msg->tag.time; + env->platform->new_async_event(env->platform); + + for (size_t i = 0; i < env->net_bundles_size; i++) { + FederatedConnectionBundle *bundle = env->net_bundles[i]; + if (bundle != self) { + bundle->net_channel->send_blocking(bundle->net_channel, _msg); + } + } + } else { + LF_DEBUG(FED, "Ignoring start tag signal. Already received one"); + assert(msg->tag.time == sched->start_time); + } + + env->platform->leave_critical_section(env->platform); +} + // Callback registered with the NetworkChannel. Is called asynchronously when there is a // a TaggedMessage available. -void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, TaggedMessage *msg) { +void FederatedConnectionBundle_handle_tagged_msg(FederatedConnectionBundle *self, const FederateMessage *_msg) { + const TaggedMessage *msg = &_msg->message.tagged_message; LF_DEBUG(FED, "Callback on FedConnBundle %p for message with tag=%" PRId64 ":%" PRIu32, self, msg->tag.time, msg->tag.microstep); assert(((size_t)msg->conn_id) < self->inputs_size); @@ -178,15 +209,26 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, env->platform->leave_critical_section(env->platform); } +void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, const FederateMessage *msg) { + switch (msg->type) { + case MessageType_TAGGED_MESSAGE: + FederatedConnectionBundle_handle_tagged_msg(self, msg); + break; + case MessageType_START_TAG_SIGNAL: + FederatedConnectionBundle_handle_start_tag_signal(self, msg); + break; + default: + LF_ERR(FED, "Unknown message type %d", msg->type); + } +} + lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { memcpy(user_struct, msg_buf, MIN(msg_size, 832)); - return LF_OK; } size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { memcpy(msg_buf, user_struct, MIN(user_struct_size, 832)); - return MIN(user_struct_size, 832); } @@ -206,3 +248,17 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa // Register callback function for message received. self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); } + +void Federated_distribute_start_tag(Environment *env, instant_t start_time) { + LF_DEBUG(FED, "Distribute start time of %" PRId64 " to other federates", start_time); + FederateMessage start_tag_signal; + start_tag_signal.type = MessageType_START_TAG_SIGNAL; + start_tag_signal.which_message = FederateMessage_start_tag_signal_tag; + start_tag_signal.message.start_tag_signal.tag.time = start_time; + start_tag_signal.message.start_tag_signal.tag.microstep = 0; + + for (size_t i = 0; i < env->net_bundles_size; i++) { + FederatedConnectionBundle *bundle = env->net_bundles[i]; + bundle->net_channel->send_blocking(bundle->net_channel, &start_tag_signal); + } +} \ No newline at end of file diff --git a/src/platform/pico/pico.c b/src/platform/pico/pico.c index ed4d47bd..e3843220 100644 --- a/src/platform/pico/pico.c +++ b/src/platform/pico/pico.c @@ -1,5 +1,5 @@ -#include "reactor-uc/platform/pico/pico.h" #include "reactor-uc/logging.h" +#include "reactor-uc/platform/pico/pico.h" #include #include #include diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 0585d9a1..a5591a5a 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -96,7 +96,7 @@ static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) { return LF_OK; } -static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, TaggedMessage *message) { +static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) { LF_DEBUG(NET, "TcpIpChannel sending message"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; @@ -144,7 +144,7 @@ static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, TaggedM return LF_OK; } -static TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { +const FederateMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; int socket; @@ -207,7 +207,7 @@ static void *TcpIpChannel_receive_thread(void *untyped_self) { self->terminate = false; while (!self->terminate) { - TaggedMessage *msg = TcpIpChannel_receive(untyped_self); + const FederateMessage *msg = TcpIpChannel_receive(untyped_self); if (msg) { self->receive_callback(self->federated_connection, msg); @@ -219,7 +219,7 @@ static void *TcpIpChannel_receive_thread(void *untyped_self) { static void TcpIpChannel_register_receive_callback(NetworkChannel *untyped_self, void (*receive_callback)(FederatedConnectionBundle *conn, - TaggedMessage *msg), + const FederateMessage *msg), FederatedConnectionBundle *conn) { int res; LF_INFO(NET, "TCP/IP registering callback thread"); diff --git a/src/reactor.c b/src/reactor.c index 12a18cee..6dac1316 100644 --- a/src/reactor.c +++ b/src/reactor.c @@ -13,14 +13,7 @@ void Reactor_register_startup(Reactor *self, BuiltinTrigger *startup) { (void)self; LF_DEBUG(ENV, "Registering startup trigger %p with Reactor %s", startup, self->name); Environment *env = self->env; - Scheduler *sched = &env->scheduler; - - assert(startup->super.type == TRIG_STARTUP); - if (!env->startup) { - tag_t start_tag = {.microstep = 0, .time = sched->start_time}; - Event event = EVENT_INIT(start_tag, &startup->super, NULL); - validaten(env->scheduler.schedule_at(&env->scheduler, &event)); env->startup = startup; } else { BuiltinTrigger *last_in_chain = env->startup; diff --git a/src/scheduler.c b/src/scheduler.c index 9ae4baff..2b62ded7 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -145,6 +145,58 @@ void Scheduler_do_shutdown(Scheduler *self, tag_t shutdown_tag) { } } +void Scheduler_schedule_startups(Scheduler *self, tag_t start_tag) { + Environment *env = self->env; + if (env->startup) { + Event event = EVENT_INIT(start_tag, &env->startup->super, NULL); + self->schedule_at_locked(self, &event); + } +} + +void Scheduler_schedule_timers(Scheduler *self, Reactor *reactor, tag_t start_tag) { + for (size_t i = 0; i < reactor->triggers_size; i++) { + Trigger *trigger = reactor->triggers[i]; + if (trigger->type == TRIG_TIMER) { + Timer *timer = (Timer *)trigger; + tag_t tag = {.time = start_tag.time + timer->offset, .microstep = start_tag.microstep}; + Event event = EVENT_INIT(tag, &timer->super, NULL); + self->schedule_at_locked(self, &event); + } + } + for (size_t i = 0; i < reactor->children_size; i++) { + Scheduler_schedule_timers(self, reactor->children[i], start_tag); + } +} + +void Scheduler_acquire_and_schedule_start_tag(Scheduler *self) { + Environment *env = self->env; + env->enter_critical_section(env); + if (env->net_bundles_size == 0) { + self->start_time = env->get_physical_time(env); + LF_DEBUG(SCHED, "No federated connections, picking start_time %" PRId64, self->start_time); + } else if (self->leader) { + self->start_time = env->get_physical_time(env); + LF_DEBUG(SCHED, "Is leader of the federation, picking start_time %" PRId64, self->start_time); + Federated_distribute_start_tag(env, self->start_time); + } else { + LF_DEBUG(SCHED, "Not leader, waiting for start tag signal"); + while (self->start_time == NEVER) { + env->wait_until(env, FOREVER); + } + } + LF_DEBUG(SCHED, "Start_time is %" PRId64, self->start_time); + tag_t start_tag = {.time = self->start_time, .microstep = 0}; + // Set the stop tag + self->stop_tag = lf_delay_tag(start_tag, self->duration); + LF_DEBUG(INFO, "Start time is %" PRId64 "and stop time is %" PRId64 " (%" PRId32 ")", self->start_time, + self->stop_tag.time, self->duration); + + // Schedule the initial events + Scheduler_schedule_startups(self, start_tag); + Scheduler_schedule_timers(self, env->main, start_tag); + env->leave_critical_section(env); +} + void Scheduler_run(Scheduler *self) { Environment *env = self->env; lf_ret_t res; @@ -250,10 +302,7 @@ lf_ret_t Scheduler_schedule_at(Scheduler *self, Event *event) { return res; } -void Scheduler_set_timeout(Scheduler *self, interval_t duration) { - self->stop_tag.microstep = 0; - self->stop_tag.time = lf_time_add(self->start_time, duration); -} +void Scheduler_set_duration(Scheduler *self, interval_t duration) { self->duration = duration; } void Scheduler_request_shutdown(Scheduler *self) { Environment *env = self->env; @@ -275,17 +324,16 @@ void Scheduler_ctor(Scheduler *self, Environment *env) { self->schedule_at = Scheduler_schedule_at; self->schedule_at_locked = Scheduler_schedule_at_locked; self->register_for_cleanup = Scheduler_register_for_cleanup; - self->set_timeout = Scheduler_set_timeout; self->request_shutdown = Scheduler_request_shutdown; + self->acquire_and_schedule_start_tag = Scheduler_acquire_and_schedule_start_tag; self->keep_alive = false; self->stop_tag = FOREVER_TAG; self->current_tag = NEVER_TAG; + self->start_time = NEVER; + self->duration = FOREVER; self->cleanup_ll_head = NULL; self->cleanup_ll_tail = NULL; + self->leader = false; EventQueue_ctor(&self->event_queue); ReactionQueue_ctor(&self->reaction_queue); - - // Set start time - self->start_time = ((self->env->platform->get_physical_time(self->env->platform) + SEC(1)) / SEC(1)) * SEC(1); - LF_INFO(ENV, "Start time: %" PRId64, self->start_time); } diff --git a/src/timer.c b/src/timer.c index d6e60db0..1ce5b983 100644 --- a/src/timer.c +++ b/src/timer.c @@ -38,11 +38,4 @@ void Timer_ctor(Timer *self, Reactor *parent, instant_t offset, interval_t perio self->effects.num_registered = 0; Trigger_ctor(&self->super, TRIG_TIMER, parent, NULL, Timer_prepare, Timer_cleanup); - - // Schedule first - Scheduler *sched = &self->super.parent->env->scheduler; - tag_t tag = {.microstep = 0, .time = sched->start_time + offset}; - Event event = EVENT_INIT(tag, &self->super, NULL); - - sched->schedule_at(sched, &event); } diff --git a/test/lf/src/Parameters.lf b/test/lf/src/Parameters.lf index 6d01f282..29e59dfd 100644 --- a/test/lf/src/Parameters.lf +++ b/test/lf/src/Parameters.lf @@ -5,6 +5,7 @@ reactor Inner(offset: time = 15 msec) { timer t(offset) reaction(t) {= + printf("Tag=%d\n", env->get_elapsed_logical_time(env)); assert(env->get_elapsed_logical_time(env) == MSEC(5)); =} } diff --git a/test/unit/action_test.c b/test/unit/action_test.c index 5e042234..b9735cac 100644 --- a/test/unit/action_test.c +++ b/test/unit/action_test.c @@ -58,7 +58,7 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); - env.scheduler.set_timeout(&env.scheduler, MSEC(100)); + env.scheduler.duration = MSEC(100); env.assemble(&env); env.start(&env); Environment_free(&env); diff --git a/test/unit/delayed_conn_test.c b/test/unit/delayed_conn_test.c index 2e401eff..b16c86c6 100644 --- a/test/unit/delayed_conn_test.c +++ b/test/unit/delayed_conn_test.c @@ -108,7 +108,7 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&main); Main_ctor(&main, &env); - env.scheduler.set_timeout(&env.scheduler, MSEC(100)); + env.scheduler.duration = MSEC(100); env.assemble(&env); env.start(&env); Environment_free(&env); diff --git a/test/unit/nanopb_test.c b/test/unit/nanopb_test.c index 634ac5b9..1d646bd7 100644 --- a/test/unit/nanopb_test.c +++ b/test/unit/nanopb_test.c @@ -10,27 +10,33 @@ void test_nanopb() { - TaggedMessage original_message; - TaggedMessage deserialized_message; + FederateMessage _original_msg; + FederateMessage _deserialized_msg; + + _original_msg.type = MessageType_TAGGED_MESSAGE; + _original_msg.which_message = FederateMessage_tagged_message_tag; + + TaggedMessage *original_message = &_original_msg.message.tagged_message; + TaggedMessage *deserialized_msg = &_deserialized_msg.message.tagged_message; unsigned char buffer[BUFFER_SIZE]; unsigned char *message = NULL; int message_size = 0; - original_message.conn_id = MSG_ID; + original_message->conn_id = MSG_ID; const char *text = "Hello World1234"; - memcpy(original_message.payload.bytes, text, sizeof("Hello World1234")); // NOLINT - original_message.payload.size = sizeof("Hello World1234"); + memcpy(original_message->payload.bytes, text, sizeof("Hello World1234")); // NOLINT + original_message->payload.size = sizeof("Hello World1234"); message = buffer; - message_size = encode_protobuf(&original_message, buffer, BUFFER_SIZE); + message_size = encode_protobuf(&_original_msg, buffer, BUFFER_SIZE); TEST_ASSERT_TRUE(message_size > 0); - int remaining_bytes = decode_protobuf(&deserialized_message, message, message_size); + int remaining_bytes = decode_protobuf(&_deserialized_msg, message, message_size); TEST_ASSERT_TRUE(remaining_bytes >= 0); - TEST_ASSERT_EQUAL(original_message.conn_id, deserialized_message.conn_id); - TEST_ASSERT_EQUAL_STRING((char *)original_message.payload.bytes, (char *)deserialized_message.payload.bytes); + TEST_ASSERT_EQUAL(original_message->conn_id, deserialized_msg->conn_id); + TEST_ASSERT_EQUAL_STRING((char *)original_message->payload.bytes, (char *)deserialized_msg->payload.bytes); } int main(void) { diff --git a/test/unit/physical_action_test.c b/test/unit/physical_action_test.c index 68a3a4e7..0b802329 100644 --- a/test/unit/physical_action_test.c +++ b/test/unit/physical_action_test.c @@ -95,7 +95,7 @@ void test_simple() { MyReactor my_reactor; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); - env.scheduler.set_timeout(&env.scheduler, MSEC(100)); + env.scheduler.duration = MSEC(100); env.assemble(&env); env.start(&env); Environment_free(&env); diff --git a/test/unit/port_test.c b/test/unit/port_test.c index 5aa702bb..2c7d7cc9 100644 --- a/test/unit/port_test.c +++ b/test/unit/port_test.c @@ -107,7 +107,7 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&main); Main_ctor(&main, &env); - env.scheduler.set_timeout(&env.scheduler, MSEC(100)); + env.scheduler.duration = MSEC(100); env.assemble(&env); env.start(&env); Environment_free(&env); diff --git a/test/unit/request_shutdown_test.c b/test/unit/request_shutdown_test.c index 2b3b69f3..8401d5e9 100644 --- a/test/unit/request_shutdown_test.c +++ b/test/unit/request_shutdown_test.c @@ -77,7 +77,7 @@ void test_simple() { Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); - env.scheduler.set_timeout(&env.scheduler, MSEC(100)); + env.scheduler.duration = MSEC(100); env.assemble(&env); env.start(&env); Environment_free(&env); diff --git a/test/unit/tcp_channel_test.c b/test/unit/tcp_channel_test.c index 637d86d0..0bfba788 100644 --- a/test/unit/tcp_channel_test.c +++ b/test/unit/tcp_channel_test.c @@ -54,8 +54,9 @@ void test_client_try_connect_non_blocking(void) { int ret = client_channel->try_connect(client_channel); } -void server_callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { +void server_callback_handler(FederatedConnectionBundle *self, const FederateMessage *_msg) { (void)self; + const TaggedMessage *msg = &_msg->message.tagged_message; printf("\nServer: Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); TEST_ASSERT_EQUAL_STRING(MESSAGE_CONTENT, (char *)msg->payload.bytes); @@ -77,14 +78,18 @@ void test_client_send_and_server_recv(void) { server_channel->register_receive_callback(server_channel, server_callback_handler, NULL); /* create message */ - TaggedMessage port_message; - port_message.conn_id = MESSAGE_CONNECTION_ID; + FederateMessage msg; + msg.type = MessageType_TAGGED_MESSAGE; + msg.which_message = FederateMessage_tagged_message_tag; + + TaggedMessage *port_message = &msg.message.tagged_message; + port_message->conn_id = MESSAGE_CONNECTION_ID; const char *message = MESSAGE_CONTENT; - memcpy(port_message.payload.bytes, message, sizeof(MESSAGE_CONTENT)); - port_message.payload.size = sizeof(MESSAGE_CONTENT); + memcpy(port_message->payload.bytes, message, sizeof(MESSAGE_CONTENT)); // NOLINT + port_message->payload.size = sizeof(MESSAGE_CONTENT); /* send message */ - client_channel->send_blocking(client_channel, &port_message); + client_channel->send_blocking(client_channel, &msg); /* wait for the callback */ sleep(1); @@ -93,8 +98,9 @@ void test_client_send_and_server_recv(void) { TEST_ASSERT_TRUE(server_callback_called); } -void client_callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { +void client_callback_handler(FederatedConnectionBundle *self, const FederateMessage *_msg) { (void)self; + const TaggedMessage *msg = &_msg->message.tagged_message; printf("\nClient: Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); TEST_ASSERT_EQUAL_STRING(MESSAGE_CONTENT, (char *)msg->payload.bytes); @@ -116,14 +122,18 @@ void test_server_send_and_client_recv(void) { client_channel->register_receive_callback(client_channel, client_callback_handler, NULL); /* create message */ - TaggedMessage port_message; - port_message.conn_id = MESSAGE_CONNECTION_ID; + FederateMessage msg; + msg.type = MessageType_TAGGED_MESSAGE; + msg.which_message = FederateMessage_tagged_message_tag; + + TaggedMessage *port_message = &msg.message.tagged_message; + port_message->conn_id = MESSAGE_CONNECTION_ID; const char *message = MESSAGE_CONTENT; - memcpy(port_message.payload.bytes, message, sizeof(MESSAGE_CONTENT)); - port_message.payload.size = sizeof(MESSAGE_CONTENT); + memcpy(port_message->payload.bytes, message, sizeof(MESSAGE_CONTENT)); // NOLINT + port_message->payload.size = sizeof(MESSAGE_CONTENT); /* send message */ - server_channel->send_blocking(server_channel, &port_message); + server_channel->send_blocking(server_channel, &msg); /* wait for the callback */ sleep(1); diff --git a/test/unit/timer_test.c b/test/unit/timer_test.c index cd739be8..b0750fc0 100644 --- a/test/unit/timer_test.c +++ b/test/unit/timer_test.c @@ -37,7 +37,7 @@ MyReactor my_reactor; Environment env; void test_simple() { Environment_ctor(&env, (Reactor *)&my_reactor); - env.scheduler.set_timeout(&env.scheduler, MSEC(100)); + env.scheduler.duration = MSEC(100); MyReactor_ctor(&my_reactor, &env); env.assemble(&env); env.start(&env);