From a0c1d75e99b3dee00dd1891d1e04efe97da56c1d Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 00:29:56 +0200 Subject: [PATCH 01/10] add functions to set serialize/deserialize functions --- .../posix/testing_tcp_ip_channel_server_callback.c | 4 +++- include/reactor-uc/encoding.h | 8 ++++++-- include/reactor-uc/network_channel.h | 8 +++++++- include/reactor-uc/platform/posix/tcp_ip_channel.h | 5 ++++- src/federated.c | 4 +++- src/platform/posix/tcp_ip_channel.c | 14 +++++++++++++- 6 files changed, 36 insertions(+), 7 deletions(-) diff --git a/examples/posix/testing_tcp_ip_channel_server_callback.c b/examples/posix/testing_tcp_ip_channel_server_callback.c index c6a4e50c..c922009e 100644 --- a/examples/posix/testing_tcp_ip_channel_server_callback.c +++ b/examples/posix/testing_tcp_ip_channel_server_callback.c @@ -5,7 +5,9 @@ #include TcpIpChannel channel; -void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { +void callback_handler(FederatedConnectionBundle *self, void *raw_msg) { + TaggedMessage* msg = raw_msg; + printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); channel.super.send(&channel.super, msg); } diff --git a/include/reactor-uc/encoding.h b/include/reactor-uc/encoding.h index 76fa5576..80ce6821 100644 --- a/include/reactor-uc/encoding.h +++ b/include/reactor-uc/encoding.h @@ -5,7 +5,9 @@ #include #include -int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t buffer_size) { +int encode_protobuf(const void *raw_message, unsigned char *buffer, size_t buffer_size) { + const TaggedMessage *message = raw_message; + // turing write buffer into pb_ostream buffer pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size); @@ -17,7 +19,9 @@ int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t return (int)stream_out.bytes_written; } -int decode_protobuf(TaggedMessage *message, const unsigned char *buffer, size_t buffer_size) { +int decode_protobuf(void *raw_message, const unsigned char *buffer, size_t buffer_size) { + TaggedMessage *message = raw_message; + pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size); if (!pb_decode(&stream_in, TaggedMessage_fields, message)) { diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 1fd4b83a..2beeb6a8 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -12,17 +12,23 @@ typedef struct FederatedConnectionBundle FederatedConnectionBundle; typedef struct NetworkChannel NetworkChannel; +typedef int (*encode_message_hook)(const void *message, unsigned char *buffer, size_t buffer_size); +typedef int (*decode_message_hook)(void *message, const unsigned char *buffer, size_t buffer_size); + struct NetworkChannel { lf_ret_t (*bind)(NetworkChannel *self); lf_ret_t (*connect)(NetworkChannel *self); bool (*accept)(NetworkChannel *self); void (*close)(NetworkChannel *self); void (*register_callback)(NetworkChannel *self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), + void (*receive_callback)(FederatedConnectionBundle *conn, void *message), FederatedConnectionBundle *conn); lf_ret_t (*send)(NetworkChannel *self, TaggedMessage *message); TaggedMessage *(*receive)(NetworkChannel *self); void (*free)(NetworkChannel *self); + + void (*register_encode_hook)(NetworkChannel *self, encode_message_hook encode_hook); + void (*register_decode_hook)(NetworkChannel *self, decode_message_hook decode_hook); }; #endif // REACTOR_UC_NETWORK_CHANNEL_H diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index e3491414..ef86ad91 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -42,7 +42,10 @@ struct TcpIpChannel { char receive_thread_stack[TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE]; FederatedConnectionBundle *federated_connection; - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message); + void (*receive_callback)(FederatedConnectionBundle *conn, void *message); + + decode_message_hook decode_hook; + encode_message_hook encode_hook; }; void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family); diff --git a/src/federated.c b/src/federated.c index cfef3acd..3ed66340 100644 --- a/src/federated.c +++ b/src/federated.c @@ -112,7 +112,9 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare // Callback registered with the NetworkChannel. Is called asynchronously when there is a // a TaggedMessage available. -void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, TaggedMessage *msg) { +void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, void *raw_msg) { + TaggedMessage *msg = raw_msg; + LF_DEBUG(FED, "Callback on FedConnBundle %p for message with tag=%" PRId64 ":%" PRIu32, self, msg->tag.time, msg->tag.microstep); assert(((size_t)msg->conn_id) < self->inputs_size); diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 4a2ff5ed..d810ae53 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -205,7 +205,7 @@ void *TcpIpChannel_receive_thread(void *untyped_self) { } void TcpIpChannel_register_callback(NetworkChannel *untyped_self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *msg), + void (*receive_callback)(FederatedConnectionBundle *conn, void *msg), FederatedConnectionBundle *conn) { int res; LF_INFO(NET, "TCP/IP registering callback thread"); @@ -238,6 +238,14 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, } } +void TcpIpChannel_register_decode_hook(NetworkChannel *self, decode_message_hook hook) { + ((TcpIpChannel *)self)->decode_hook = hook; +} + +void TcpIpChannel_register_encode_hook(NetworkChannel *self, encode_message_hook hook) { + ((TcpIpChannel *)self)->encode_hook = hook; +} + void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family) { FD_ZERO(&self->set); @@ -260,9 +268,13 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->super.receive = TcpIpChannel_receive; self->super.send = TcpIpChannel_send; self->super.register_callback = TcpIpChannel_register_callback; + self->super.register_decode_hook = TcpIpChannel_register_decode_hook; + self->super.register_encode_hook = TcpIpChannel_register_encode_hook; self->super.free = TcpIpChannel_free; self->receive_callback = NULL; self->federated_connection = NULL; + self->decode_hook = decode_protobuf; + self->encode_hook = encode_protobuf; } void TcpIpChannel_free(NetworkChannel *untyped_self) { From 7babd6f32522e701c33b67b6063030ab2973dc79 Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 01:04:58 +0200 Subject: [PATCH 02/10] clean up fixing some smells --- CMakeLists.txt | 2 +- src/action.c | 1 - src/platform/posix/posix.c | 2 -- src/platform/posix/tcp_ip_channel.c | 14 +++++--------- src/reaction.c | 2 -- src/scheduler.c | 3 +-- src/tag.c | 1 - src/timer.c | 2 -- src/trigger.c | 4 ---- 9 files changed, 7 insertions(+), 24 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ef79a893..5df2d9d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.9) project(reactor-uc LANGUAGES C) # Command line options for the build -set(BUILD_EXAMPLES OFF CACHE BOOL "Build examples") +set(BUILD_EXAMPLES ON CACHE BOOL "Build examples") set(BUILD_TESTS OFF CACHE BOOL "Build all tests") set(BUILD_LF_TESTS OFF CACHE BOOL "Build lf tests") set(BUILD_UNIT_TESTS OFF CACHE BOOL "Build unit tests") diff --git a/src/action.c b/src/action.c index 589a575c..9a4b0b80 100644 --- a/src/action.c +++ b/src/action.c @@ -3,7 +3,6 @@ #include "reactor-uc/logging.h" #include "reactor-uc/trigger.h" -#include #include void Action_cleanup(Trigger *self) { diff --git a/src/platform/posix/posix.c b/src/platform/posix/posix.c index b100bfd0..c7faa8e6 100644 --- a/src/platform/posix/posix.c +++ b/src/platform/posix/posix.c @@ -1,9 +1,7 @@ #include "reactor-uc/platform/posix/posix.h" #include "reactor-uc/logging.h" -#include #include #include -#include #include #include diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index d810ae53..0890d6c4 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -3,11 +3,7 @@ #include "reactor-uc/logging.h" #include -#include #include -#include -#include -#include #include #include #include @@ -115,7 +111,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) while (bytes_written < message_size && timeout > 0) { LF_DEBUG(NET, "Sending %d bytes", message_size - bytes_written); - int bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0); + ssize_t bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0); LF_DEBUG(NET, "%d bytes sent", bytes_send); if (bytes_send < 0) { @@ -149,13 +145,13 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { // calculating the maximum amount of bytes we can read int bytes_available = TCP_IP_CHANNEL_BUFFERSIZE - self->read_index; - int bytes_left = 0; + int bytes_left; bool read_more = true; while (read_more) { // reading from socket - int bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0); + ssize_t bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0); if (bytes_read < 0) { LF_ERR(NET, "Error recv from socket %d", errno); @@ -214,7 +210,7 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, self->receive_callback = receive_callback; self->federated_connection = conn; memset(&self->receive_thread_stack, 0, TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE); - if (pthread_attr_init(&self->receive_thread_attr) < 0) { + if (pthread_attr_init(&self->receive_thread_attr) != 0) { throw("pthread_attr_init failed"); } /* TODO: RIOT posix-wrappers don't have pthread_attr_setstack yet */ @@ -228,7 +224,7 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, } #else if (pthread_attr_setstack(&self->receive_thread_attr, &self->receive_thread_stack, - TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE - TCP_IP_CHANNEL_RECV_THREAD_STACK_GUARD_SIZE) < 0) { + TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE - TCP_IP_CHANNEL_RECV_THREAD_STACK_GUARD_SIZE) != 0) { throw("pthread_attr_setstack failed"); } #endif diff --git a/src/reaction.c b/src/reaction.c index 5144712b..84d7a5ac 100644 --- a/src/reaction.c +++ b/src/reaction.c @@ -3,8 +3,6 @@ #include "reactor-uc/port.h" #include "reactor-uc/trigger.h" -#include - static size_t calculate_input_port_level(Input *port); size_t Reaction_get_level(Reaction *self) { diff --git a/src/scheduler.c b/src/scheduler.c index 09f75b89..1706c3af 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -1,7 +1,6 @@ #include "reactor-uc/scheduler.h" #include "reactor-uc/environment.h" #include "reactor-uc/logging.h" -#include "reactor-uc/reactor-uc.h" // Private functions @@ -147,7 +146,7 @@ void Scheduler_terminate(Scheduler *self) { void Scheduler_run(Scheduler *self) { Environment *env = self->env; - lf_ret_t res = 0; + lf_ret_t res; bool do_shutdown = false; bool non_terminating = self->keep_alive || env->has_async_events; LF_INFO(SCHED, "Scheduler running with non_terminating=%d has_async_events=%d", non_terminating, diff --git a/src/tag.c b/src/tag.c index e69d8226..a770859e 100644 --- a/src/tag.c +++ b/src/tag.c @@ -9,7 +9,6 @@ */ #include "reactor-uc/tag.h" -#include "reactor-uc/environment.h" instant_t lf_time_add(instant_t time, interval_t interval) { if (time == NEVER || interval == NEVER) { diff --git a/src/timer.c b/src/timer.c index 4bf24aa9..d6e60db0 100644 --- a/src/timer.c +++ b/src/timer.c @@ -2,8 +2,6 @@ #include "reactor-uc/environment.h" #include "reactor-uc/logging.h" -#include - void Timer_prepare(Trigger *_self, Event *event) { (void)event; LF_DEBUG(TRIG, "Preparing timer %p", _self); diff --git a/src/trigger.c b/src/trigger.c index 7e2c4025..395d135d 100644 --- a/src/trigger.c +++ b/src/trigger.c @@ -1,9 +1,5 @@ #include "reactor-uc/trigger.h" #include "reactor-uc/environment.h" -#include "reactor-uc/logging.h" -#include "reactor-uc/timer.h" - -#include void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, EventPayloadPool *payload_pool, void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *)) { From 270975db85915e69b3a78164e29afe171ea61e18 Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 01:11:42 +0200 Subject: [PATCH 03/10] use hooks --- CMakeLists.txt | 2 +- src/platform/posix/tcp_ip_channel.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5df2d9d2..ef79a893 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.9) project(reactor-uc LANGUAGES C) # Command line options for the build -set(BUILD_EXAMPLES ON CACHE BOOL "Build examples") +set(BUILD_EXAMPLES OFF CACHE BOOL "Build examples") set(BUILD_TESTS OFF CACHE BOOL "Build all tests") set(BUILD_LF_TESTS OFF CACHE BOOL "Build lf tests") set(BUILD_UNIT_TESTS OFF CACHE BOOL "Build unit tests") diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 0890d6c4..21757687 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -98,7 +98,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) } // serializing protobuf into buffer - int message_size = encode_protobuf(message, self->write_buffer, TCP_IP_CHANNEL_BUFFERSIZE); + int message_size = self->encode_hook(message, self->write_buffer, TCP_IP_CHANNEL_BUFFERSIZE); if (message_size < 0) { LF_ERR(NET, "Could not encode protobuf"); @@ -159,7 +159,7 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { } self->read_index += bytes_read; - bytes_left = decode_protobuf(&self->output, self->read_buffer, self->read_index); + bytes_left = self->decode_hook(&self->output, self->read_buffer, self->read_index); if (bytes_left < 0) { read_more = true; } else { From f8f9e72f1743c35eb21d4916aadd7d1dda2397fa Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 02:48:49 +0200 Subject: [PATCH 04/10] moving serialization/deserialization into FederatedConnection --- examples/posix/testing_fed_conn.c | 12 ++- include/reactor-uc/federated.h | 14 +++- include/reactor-uc/network_channel.h | 6 -- .../platform/posix/tcp_ip_channel.h | 3 - src/federated.c | 74 +++++++++++++------ src/platform/posix/tcp_ip_channel.c | 18 +---- 6 files changed, 76 insertions(+), 51 deletions(-) diff --git a/examples/posix/testing_fed_conn.c b/examples/posix/testing_fed_conn.c index 8bef6415..827172c4 100644 --- a/examples/posix/testing_fed_conn.c +++ b/examples/posix/testing_fed_conn.c @@ -89,6 +89,7 @@ typedef struct { TcpIpChannel channel; ConnSender conn; FederatedOutputConnection *output[1]; + serialize_hook* serialize_hooks[1]; } SenderRecvBundle; void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { @@ -106,8 +107,9 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { validate(new_connection); printf("Sender: Accepted\n"); - FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, 0, - (FederatedOutputConnection **)&self->output, 1); + FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, + NULL, NULL, 0, + (FederatedOutputConnection **)&self->output, self->serialize_hooks, 1); } DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, MSEC(100), false) @@ -117,6 +119,7 @@ typedef struct { TcpIpChannel channel; ConnRecv conn; FederatedInputConnection *inputs[1]; + deserialize_hook* deserialize_hooks[1]; } RecvSenderBundle; void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { @@ -133,8 +136,9 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { validate(ret == LF_OK); printf("Recv: Connected\n"); - FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs, - 1, NULL, 0); + FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, + (FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1, + NULL, NULL, 0); } // Reactor main diff --git a/include/reactor-uc/federated.h b/include/reactor-uc/federated.h index 9642d9bf..0b8b66e8 100644 --- a/include/reactor-uc/federated.h +++ b/include/reactor-uc/federated.h @@ -10,6 +10,12 @@ typedef struct FederatedOutputConnection FederatedOutputConnection; typedef struct FederatedInputConnection FederatedInputConnection; typedef struct NetworkChannel NetworkChannel; +// returns how many bytes of the buffer were used by the serialized string +typedef size_t (*serialize_hook)(const void *user_struct, size_t user_struct_size, unsigned char *msg_buffer); + +// returns if the deserialization was successful +typedef lf_ret_t (*deserialize_hook)(void *user_struct, const unsigned char *msg_buffer, size_t msg_size); + // Wrapping all connections going both ways between this federated and // another federated of. struct FederatedConnectionBundle { @@ -17,16 +23,20 @@ struct FederatedConnectionBundle { NetworkChannel *net_channel; // Pointer to the network super doing the actual I/O // Pointer to an array of input connections which should live in the derived struct. FederatedInputConnection **inputs; + deserialize_hook *deserialize_hooks; size_t inputs_size; + // Pointer to an array of output connections which should live in the derived struct. FederatedOutputConnection **outputs; + serialize_hook *serialize_hook; size_t outputs_size; bool server; // Does this federate work as server or client }; void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel, - FederatedInputConnection **inputs, size_t inputs_size, - FederatedOutputConnection **outputs, size_t outputs_size); + FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks, + size_t inputs_size, FederatedOutputConnection **outputs, + serialize_hook *serialize_hooks, size_t outputs_size); // A single output connection from this federate. Might connect to several // downstream ports, but all of them must be in the same federate diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 2beeb6a8..194599c2 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -12,9 +12,6 @@ typedef struct FederatedConnectionBundle FederatedConnectionBundle; typedef struct NetworkChannel NetworkChannel; -typedef int (*encode_message_hook)(const void *message, unsigned char *buffer, size_t buffer_size); -typedef int (*decode_message_hook)(void *message, const unsigned char *buffer, size_t buffer_size); - struct NetworkChannel { lf_ret_t (*bind)(NetworkChannel *self); lf_ret_t (*connect)(NetworkChannel *self); @@ -26,9 +23,6 @@ struct NetworkChannel { lf_ret_t (*send)(NetworkChannel *self, TaggedMessage *message); TaggedMessage *(*receive)(NetworkChannel *self); void (*free)(NetworkChannel *self); - - void (*register_encode_hook)(NetworkChannel *self, encode_message_hook encode_hook); - void (*register_decode_hook)(NetworkChannel *self, decode_message_hook decode_hook); }; #endif // REACTOR_UC_NETWORK_CHANNEL_H diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index ef86ad91..cefc7f2e 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -43,9 +43,6 @@ struct TcpIpChannel { FederatedConnectionBundle *federated_connection; void (*receive_callback)(FederatedConnectionBundle *conn, void *message); - - decode_message_hook decode_hook; - encode_message_hook encode_hook; }; void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family); diff --git a/src/federated.c b/src/federated.c index 3ed66340..474f01d7 100644 --- a/src/federated.c +++ b/src/federated.c @@ -3,6 +3,9 @@ #include "reactor-uc/logging.h" #include "reactor-uc/platform.h" +#define MAX(x, y) (((x) > (y)) ? (x) : (y)) +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) + // TODO: Refactor so this function is available void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size); @@ -48,8 +51,9 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { msg.tag.time = sched->current_tag.time; msg.tag.microstep = sched->current_tag.microstep; - memcpy(msg.payload.bytes, self->staged_payload_ptr, self->payload_pool.size); - msg.payload.size = self->payload_pool.size; + size_t msg_size = (*self->bundle->serialize_hook[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size, + msg.payload.bytes); + msg.payload.size = msg_size; LF_DEBUG(FED, "FedOutConn %p sending message with tag=%" PRId64 ":%" PRIu32, trigger, msg.tag.time, msg.tag.microstep); @@ -143,23 +147,28 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, if (ret != LF_OK) { LF_ERR(FED, "Input buffer at Connection %p is full. Dropping incoming msg", input); } else { - memcpy(payload, msg->payload.bytes, msg->payload.size); - Event event = EVENT_INIT(tag, &input->super.super, payload); - ret = sched->schedule_at_locked(sched, &event); - switch (ret) { - case LF_AFTER_STOP_TAG: - LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n"); - break; - case LF_PAST_TAG: - LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n"); - break; - case LF_OK: - env->platform->new_async_event(env->platform); - break; - default: - LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret); - validate(false); - break; + lf_ret_t status = (*self->deserialize_hooks[msg->conn_id])(payload, msg->payload.bytes, msg->payload.size); + + if (status == LF_OK) { + Event event = EVENT_INIT(tag, &input->super.super, payload); + ret = sched->schedule_at_locked(sched, &event); + switch (ret) { + case LF_AFTER_STOP_TAG: + LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n"); + break; + case LF_PAST_TAG: + LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n"); + break; + case LF_OK: + env->platform->new_async_event(env->platform); + break; + default: + LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret); + validate(false); + break; + } + } else { + LF_ERR(FED, "Cannot deserialize message from other Federate. Dropping\n"); } if (lf_tag_compare(input->last_known_tag, tag) < 0) { @@ -171,15 +180,38 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, env->platform->leave_critical_section(env->platform); } +lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { + memcpy(user_struct, msg_buf, MIN(msg_size, 832)); + + return LF_OK; +} + +size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + memcpy(msg_buf, user_struct, MIN(user_struct_size, 832)); + + return MIN(user_struct_size, 832); +} + void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel, - FederatedInputConnection **inputs, size_t inputs_size, - FederatedOutputConnection **outputs, size_t outputs_size) { + FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks, + size_t inputs_size, FederatedOutputConnection **outputs, + serialize_hook *serialize_hooks, size_t outputs_size) { self->inputs = inputs; self->inputs_size = inputs_size; self->net_channel = net_channel; self->outputs = outputs; self->outputs_size = outputs_size; self->parent = parent; + self->deserialize_hooks = deserialize_hooks; + self->serialize_hook = serialize_hooks; + + for (size_t i = 0; i < inputs_size; i++) { + self->deserialize_hooks[i] = standard_deserialization; + } + for (size_t i = 0; i < outputs_size; i++) { + self->serialize_hook[i] = standard_serialization; + } + // Register callback function for message received. self->net_channel->register_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); } diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 21757687..470482aa 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -98,7 +98,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) } // serializing protobuf into buffer - int message_size = self->encode_hook(message, self->write_buffer, TCP_IP_CHANNEL_BUFFERSIZE); + int message_size = encode_protobuf(message, self->write_buffer, TCP_IP_CHANNEL_BUFFERSIZE); if (message_size < 0) { LF_ERR(NET, "Could not encode protobuf"); @@ -159,7 +159,7 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { } self->read_index += bytes_read; - bytes_left = self->decode_hook(&self->output, self->read_buffer, self->read_index); + bytes_left = decode_protobuf(&self->output, self->read_buffer, self->read_index); if (bytes_left < 0) { read_more = true; } else { @@ -224,7 +224,7 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, } #else if (pthread_attr_setstack(&self->receive_thread_attr, &self->receive_thread_stack, - TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE - TCP_IP_CHANNEL_RECV_THREAD_STACK_GUARD_SIZE) != 0) { + TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE - TCP_IP_CHANNEL_RECV_THREAD_STACK_GUARD_SIZE) < 0) { throw("pthread_attr_setstack failed"); } #endif @@ -234,14 +234,6 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, } } -void TcpIpChannel_register_decode_hook(NetworkChannel *self, decode_message_hook hook) { - ((TcpIpChannel *)self)->decode_hook = hook; -} - -void TcpIpChannel_register_encode_hook(NetworkChannel *self, encode_message_hook hook) { - ((TcpIpChannel *)self)->encode_hook = hook; -} - void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family) { FD_ZERO(&self->set); @@ -264,13 +256,9 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->super.receive = TcpIpChannel_receive; self->super.send = TcpIpChannel_send; self->super.register_callback = TcpIpChannel_register_callback; - self->super.register_decode_hook = TcpIpChannel_register_decode_hook; - self->super.register_encode_hook = TcpIpChannel_register_encode_hook; self->super.free = TcpIpChannel_free; self->receive_callback = NULL; self->federated_connection = NULL; - self->decode_hook = decode_protobuf; - self->encode_hook = encode_protobuf; } void TcpIpChannel_free(NetworkChannel *untyped_self) { From 4922a0915758adcab80747dfb2a2c5cc9bceabaa Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 03:05:54 +0200 Subject: [PATCH 05/10] adding example with custom serialize/deserialize --- examples/posix/testing_fed_conn.c | 28 ++++++++++++++++++++++++++-- src/federated.c | 7 ------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/examples/posix/testing_fed_conn.c b/examples/posix/testing_fed_conn.c index 827172c4..b67a9767 100644 --- a/examples/posix/testing_fed_conn.c +++ b/examples/posix/testing_fed_conn.c @@ -7,9 +7,28 @@ #define PORT_NUM 8901 typedef struct { - char msg[32]; + int size; + char msg[512]; } msg_t; +lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { + msg_t* msg = user_struct; + memcpy(&msg->size, msg_buf, sizeof(msg->size)); + memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size)); + + return LF_OK; +} + +size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + const msg_t* msg = user_struct; + + memcpy(msg_buf, &msg->size, sizeof(msg->size)); + memcpy(msg_buf + sizeof(msg->size), msg->msg, sizeof(msg->size)); + + return sizeof(msg->size) + msg->size; +} + + DEFINE_TIMER_STRUCT(Timer1, 1) DEFINE_TIMER_CTOR_FIXED(Timer1, 1, MSEC(0), SEC(1)) DEFINE_REACTION_STRUCT(Sender, 0, 1) @@ -33,6 +52,7 @@ DEFINE_REACTION_BODY(Sender, 0) { printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env)); msg_t val; strcpy(val.msg, "Hello From Sender"); + val.size = sizeof("Hello From Sender"); lf_set(out, val); } DEFINE_REACTION_CTOR(Sender, 0) @@ -67,7 +87,7 @@ DEFINE_REACTION_BODY(Receiver, 0) { Receiver *self = (Receiver *)_self->parent; Environment *env = self->super.env; In *inp = &self->inp; - printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), inp->value.msg); + printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg, inp->value.size); } DEFINE_REACTION_CTOR(Receiver, 0) @@ -107,6 +127,8 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { validate(new_connection); printf("Sender: Accepted\n"); + self->serialize_hooks[0] = serialize_msg_t; + FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, NULL, 0, (FederatedOutputConnection **)&self->output, self->serialize_hooks, 1); @@ -136,6 +158,8 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { validate(ret == LF_OK); printf("Recv: Connected\n"); + self->deserialize_hooks[0] = deserialize_msg_t; + FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1, NULL, NULL, 0); diff --git a/src/federated.c b/src/federated.c index 474f01d7..830bf74c 100644 --- a/src/federated.c +++ b/src/federated.c @@ -205,13 +205,6 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa self->deserialize_hooks = deserialize_hooks; self->serialize_hook = serialize_hooks; - for (size_t i = 0; i < inputs_size; i++) { - self->deserialize_hooks[i] = standard_deserialization; - } - for (size_t i = 0; i < outputs_size; i++) { - self->serialize_hook[i] = standard_serialization; - } - // Register callback function for message received. self->net_channel->register_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); } From ebecde980030306b1dfda881041034882048ee4f Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 03:10:20 +0200 Subject: [PATCH 06/10] fix array --- examples/posix/testing_fed_conn.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/posix/testing_fed_conn.c b/examples/posix/testing_fed_conn.c index b67a9767..5322e309 100644 --- a/examples/posix/testing_fed_conn.c +++ b/examples/posix/testing_fed_conn.c @@ -109,7 +109,7 @@ typedef struct { TcpIpChannel channel; ConnSender conn; FederatedOutputConnection *output[1]; - serialize_hook* serialize_hooks[1]; + serialize_hook serialize_hooks[1]; } SenderRecvBundle; void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { @@ -141,7 +141,7 @@ typedef struct { TcpIpChannel channel; ConnRecv conn; FederatedInputConnection *inputs[1]; - deserialize_hook* deserialize_hooks[1]; + deserialize_hook deserialize_hooks[1]; } RecvSenderBundle; void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { From 90fb89ca14bfc1a6810e01a28b160ffc8614765b Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 03:17:54 +0200 Subject: [PATCH 07/10] old callback interface --- include/reactor-uc/encoding.h | 8 ++------ include/reactor-uc/network_channel.h | 2 +- include/reactor-uc/platform/posix/tcp_ip_channel.h | 2 +- src/federated.c | 4 +--- src/platform/posix/tcp_ip_channel.c | 2 +- 5 files changed, 6 insertions(+), 12 deletions(-) diff --git a/include/reactor-uc/encoding.h b/include/reactor-uc/encoding.h index 80ce6821..76fa5576 100644 --- a/include/reactor-uc/encoding.h +++ b/include/reactor-uc/encoding.h @@ -5,9 +5,7 @@ #include #include -int encode_protobuf(const void *raw_message, unsigned char *buffer, size_t buffer_size) { - const TaggedMessage *message = raw_message; - +int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t buffer_size) { // turing write buffer into pb_ostream buffer pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size); @@ -19,9 +17,7 @@ int encode_protobuf(const void *raw_message, unsigned char *buffer, size_t buffe return (int)stream_out.bytes_written; } -int decode_protobuf(void *raw_message, const unsigned char *buffer, size_t buffer_size) { - TaggedMessage *message = raw_message; - +int decode_protobuf(TaggedMessage *message, const unsigned char *buffer, size_t buffer_size) { pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size); if (!pb_decode(&stream_in, TaggedMessage_fields, message)) { diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 194599c2..1fd4b83a 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -18,7 +18,7 @@ struct NetworkChannel { bool (*accept)(NetworkChannel *self); void (*close)(NetworkChannel *self); void (*register_callback)(NetworkChannel *self, - void (*receive_callback)(FederatedConnectionBundle *conn, void *message), + void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), FederatedConnectionBundle *conn); lf_ret_t (*send)(NetworkChannel *self, TaggedMessage *message); TaggedMessage *(*receive)(NetworkChannel *self); diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index cefc7f2e..e3491414 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -42,7 +42,7 @@ struct TcpIpChannel { char receive_thread_stack[TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE]; FederatedConnectionBundle *federated_connection; - void (*receive_callback)(FederatedConnectionBundle *conn, void *message); + void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message); }; void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family); diff --git a/src/federated.c b/src/federated.c index 830bf74c..e3f2d8f9 100644 --- a/src/federated.c +++ b/src/federated.c @@ -116,9 +116,7 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare // Callback registered with the NetworkChannel. Is called asynchronously when there is a // a TaggedMessage available. -void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, void *raw_msg) { - TaggedMessage *msg = raw_msg; - +void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, TaggedMessage *msg) { LF_DEBUG(FED, "Callback on FedConnBundle %p for message with tag=%" PRId64 ":%" PRIu32, self, msg->tag.time, msg->tag.microstep); assert(((size_t)msg->conn_id) < self->inputs_size); diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 470482aa..df34775e 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -201,7 +201,7 @@ void *TcpIpChannel_receive_thread(void *untyped_self) { } void TcpIpChannel_register_callback(NetworkChannel *untyped_self, - void (*receive_callback)(FederatedConnectionBundle *conn, void *msg), + void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *msg), FederatedConnectionBundle *conn) { int res; LF_INFO(NET, "TCP/IP registering callback thread"); From cf4fdf9288d58ad0b4a949e373ff7ce587a3bf6c Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 03:23:02 +0200 Subject: [PATCH 08/10] clean up --- examples/posix/testing_tcp_ip_channel_server_callback.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/posix/testing_tcp_ip_channel_server_callback.c b/examples/posix/testing_tcp_ip_channel_server_callback.c index c922009e..f5191fa2 100644 --- a/examples/posix/testing_tcp_ip_channel_server_callback.c +++ b/examples/posix/testing_tcp_ip_channel_server_callback.c @@ -5,9 +5,7 @@ #include TcpIpChannel channel; -void callback_handler(FederatedConnectionBundle *self, void *raw_msg) { - TaggedMessage* msg = raw_msg; - +void callback_handler(FederatedConnectionBundle *self, TcpIpChannel *msg) { printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); channel.super.send(&channel.super, msg); } From 67f38f5e0657b40204722bb4c0812e8fea44161f Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 03:25:03 +0200 Subject: [PATCH 09/10] fix --- examples/posix/testing_tcp_ip_channel_server_callback.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/posix/testing_tcp_ip_channel_server_callback.c b/examples/posix/testing_tcp_ip_channel_server_callback.c index f5191fa2..c6a4e50c 100644 --- a/examples/posix/testing_tcp_ip_channel_server_callback.c +++ b/examples/posix/testing_tcp_ip_channel_server_callback.c @@ -5,7 +5,7 @@ #include TcpIpChannel channel; -void callback_handler(FederatedConnectionBundle *self, TcpIpChannel *msg) { +void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); channel.super.send(&channel.super, msg); } From a3387fdcf5d63dab34380cac5b6a428bd678107e Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 03:40:50 +0200 Subject: [PATCH 10/10] last fixes --- include/reactor-uc/federated.h | 2 +- src/federated.c | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/reactor-uc/federated.h b/include/reactor-uc/federated.h index 0b8b66e8..7b10f3cf 100644 --- a/include/reactor-uc/federated.h +++ b/include/reactor-uc/federated.h @@ -28,7 +28,7 @@ struct FederatedConnectionBundle { // Pointer to an array of output connections which should live in the derived struct. FederatedOutputConnection **outputs; - serialize_hook *serialize_hook; + serialize_hook *serialize_hooks; size_t outputs_size; bool server; // Does this federate work as server or client }; diff --git a/src/federated.c b/src/federated.c index e3f2d8f9..0f7d10a3 100644 --- a/src/federated.c +++ b/src/federated.c @@ -3,7 +3,7 @@ #include "reactor-uc/logging.h" #include "reactor-uc/platform.h" -#define MAX(x, y) (((x) > (y)) ? (x) : (y)) +#undef MIN #define MIN(x, y) (((x) < (y)) ? (x) : (y)) // TODO: Refactor so this function is available @@ -51,8 +51,8 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { msg.tag.time = sched->current_tag.time; msg.tag.microstep = sched->current_tag.microstep; - size_t msg_size = (*self->bundle->serialize_hook[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size, - msg.payload.bytes); + size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size, + msg.payload.bytes); msg.payload.size = msg_size; LF_DEBUG(FED, "FedOutConn %p sending message with tag=%" PRId64 ":%" PRIu32, trigger, msg.tag.time, @@ -201,7 +201,7 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa self->outputs_size = outputs_size; self->parent = parent; self->deserialize_hooks = deserialize_hooks; - self->serialize_hook = serialize_hooks; + self->serialize_hooks = serialize_hooks; // Register callback function for message received. self->net_channel->register_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self);