diff --git a/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c b/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c index f7b5ccd9..33d478b3 100644 --- a/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c +++ b/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c @@ -1,6 +1,7 @@ #include "reactor-uc/logging.h" #include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/reactor-uc.h" +#include "reactor-uc/serialization.h" #include #include @@ -68,12 +69,25 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { NetworkChannel *chan = &self->chan.super; chan->open_connection(chan); - lf_ret_t ret; LF_DEBUG(ENV, "Recv: Connecting"); - do { + lf_ret_t ret = LF_TRY_AGAIN; + while (ret != LF_OK) { ret = chan->try_connect(chan); - } while (ret != LF_OK); + switch (ret) { + case LF_OK: + break; + case LF_IN_PROGRESS: + case LF_TRY_AGAIN: + k_msleep(100); + break; + default: + printf("Sender: Could not accept\n"); + exit(1); + break; + } + } LF_DEBUG(ENV, "Recv: Connected"); + self->deserialize_hooks[0] = deserialize_payload_default; FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1, NULL, NULL, 0); @@ -88,8 +102,6 @@ typedef struct { } MainRecv; void MainRecv_ctor(MainRecv *self, Environment *env) { - env->wait_until(env, env->get_physical_time(env) + - MSEC(100)); // FIXME: This is a hack until we have proper connection setup self->_children[0] = &self->receiver.super; Receiver_ctor(&self->receiver, &self->super, env); diff --git a/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c b/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c index 49dc182d..1c02df83 100644 --- a/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c +++ b/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c @@ -1,4 +1,5 @@ #include "reactor-uc/logging.h" +#include "reactor-uc/serialization.h" #include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/reactor-uc.h" #include @@ -68,12 +69,25 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { NetworkChannel *chan = &self->chan.super; chan->open_connection(chan); - lf_ret_t ret; LF_DEBUG(ENV, "Recv: Connecting"); - do { + lf_ret_t ret = LF_TRY_AGAIN; + while (ret != LF_OK) { ret = chan->try_connect(chan); - } while (ret != LF_OK); + switch (ret) { + case LF_OK: + break; + case LF_IN_PROGRESS: + case LF_TRY_AGAIN: + k_msleep(100); + break; + default: + printf("Sender: Could not accept\n"); + exit(1); + break; + } + } LF_DEBUG(ENV, "Recv: Connected"); + self->deserialize_hooks[0] = deserialize_payload_default; FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1, NULL, NULL, 0); @@ -88,8 +102,6 @@ typedef struct { } MainRecv; void MainRecv_ctor(MainRecv *self, Environment *env) { - env->wait_until(env, env->get_physical_time(env) + - MSEC(400)); // FIXME: This is a hack until we have proper connection setup self->_children[0] = &self->receiver.super; Receiver_ctor(&self->receiver, &self->super, env); diff --git a/examples/zephyr/basic_federated/federated_sender/src/sender.c b/examples/zephyr/basic_federated/federated_sender/src/sender.c index 9cbeb486..16c66d06 100644 --- a/examples/zephyr/basic_federated/federated_sender/src/sender.c +++ b/examples/zephyr/basic_federated/federated_sender/src/sender.c @@ -1,5 +1,6 @@ #include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/reactor-uc.h" +#include "reactor-uc/serialization.h" #include #include @@ -127,11 +128,25 @@ void SenderRecv1Bundle_ctor(SenderRecv1Bundle *self, Reactor *parent) { printf("Sender: Bound 1\n"); // accept one connection - do { + ret = LF_TRY_AGAIN; + while (ret != LF_OK) { ret = chan->try_connect(chan); - } while (ret != LF_OK); - validate(ret == LF_OK); + switch (ret) { + case LF_OK: + break; + case LF_IN_PROGRESS: + case LF_TRY_AGAIN: + k_msleep(100); + break; + break; + default: + printf("Sender: Could not accept\n"); + exit(1); + break; + } + } printf("Sender: Accepted 1\n"); + self->serialize_hooks[0] = serialize_payload_default; FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0, (FederatedOutputConnection **)&self->output, self->serialize_hooks, 1); @@ -148,12 +163,24 @@ void SenderRecv2Bundle_ctor(SenderRecv2Bundle *self, Reactor *parent) { printf("Sender: Bound 2\n"); // accept one connection - do { + ret = LF_TRY_AGAIN; + while (ret != LF_OK) { ret = chan->try_connect(chan); - } while (ret != LF_OK); - validate(ret == LF_OK); - printf("Sender: Accepted 2\n"); + switch (ret) { + case LF_OK: + break; + case LF_TRY_AGAIN: + k_msleep(100); + break; + default: + printf("Sender: Could not accept\n"); + exit(1); + break; + } + } + printf("Sender: Accepted 2\n"); + self->serialize_hooks[0] = serialize_payload_default; FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0, (FederatedOutputConnection **)&self->output, self->serialize_hooks, 1); } diff --git a/include/reactor-uc/encoding.h b/include/reactor-uc/encoding.h deleted file mode 100644 index 798bfe03..00000000 --- a/include/reactor-uc/encoding.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef REACTOR_UC_ENCODING_H -#define REACTOR_UC_ENCODING_H - -#include "proto/message.pb.h" -#include -#include - -int encode_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size) { - // turing write buffer into pb_ostream buffer - pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size); - - // serializing protobuf into buffer - if (!pb_encode(&stream_out, FederateMessage_fields, message)) { - return -1; - } - - return (int)stream_out.bytes_written; -} - -int decode_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size) { - pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size); - - if (!pb_decode(&stream_in, FederateMessage_fields, message)) { - return -1; - } - - return (int)stream_in.bytes_left; -} - -#endif // REACTOR_UC_ENCODING_H diff --git a/include/reactor-uc/error.h b/include/reactor-uc/error.h index 831fcb4b..08bd8a4b 100644 --- a/include/reactor-uc/error.h +++ b/include/reactor-uc/error.h @@ -21,7 +21,10 @@ typedef enum { LF_OUT_OF_BOUNDS, LF_NO_MEM, LF_COULD_NOT_CONNECT, - LF_NETWORK_SETUP_FAILED + LF_NETWORK_SETUP_FAILED, + LF_TIMEOUT, + LF_TRY_AGAIN, + LF_IN_PROGRESS } lf_ret_t; /** diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 153b822c..0ba783e1 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -15,6 +15,11 @@ typedef struct NetworkChannel NetworkChannel; struct NetworkChannel { size_t dest_channel_id; // So that we can "address" one of several NetworkChannel's at the other end. lf_ret_t (*open_connection)(NetworkChannel *self); + + /** Try to connect to corresponding NetworkChannel on another federate. + * @return LF_OK if connection is established, LF_IN_PROGRESS if connection is in progress, LF_TRY_AGAIN if connection + * failed and should be retried, LF_ERR if connection failed and should not be retried. + */ lf_ret_t (*try_connect)(NetworkChannel *self); void (*close_connection)(NetworkChannel *self); lf_ret_t (*send_blocking)(NetworkChannel *self, const FederateMessage *message); diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index 74a2cf6d..5c4e2f3c 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -21,6 +21,7 @@ struct TcpIpChannel { int fd; int client; + bool client_connect_in_progress; const char *host; unsigned short port; diff --git a/include/reactor-uc/reactor-uc.h b/include/reactor-uc/reactor-uc.h index cc055d11..1d588bd6 100644 --- a/include/reactor-uc/reactor-uc.h +++ b/include/reactor-uc/reactor-uc.h @@ -11,6 +11,7 @@ #include "reactor-uc/macros.h" #include "reactor-uc/platform.h" #include "reactor-uc/port.h" +#include "reactor-uc/serialization.h" #include "reactor-uc/reaction.h" #include "reactor-uc/reactor.h" #include "reactor-uc/tag.h" diff --git a/include/reactor-uc/serialization.h b/include/reactor-uc/serialization.h new file mode 100644 index 00000000..7f444a6f --- /dev/null +++ b/include/reactor-uc/serialization.h @@ -0,0 +1,13 @@ +#ifndef REACTOR_UC_SERIALIZATION_H +#define REACTOR_UC_SERIALIZATION_H + +#include "proto/message.pb.h" +#include "reactor-uc/error.h" + +int serialize_to_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size); +int deserialize_from_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size); + +lf_ret_t deserialize_payload_default(void *user_struct, const unsigned char *msg_buf, size_t msg_size); + +size_t serialize_payload_default(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf); +#endif // REACTOR_UC_SERIALIZATION_H diff --git a/src/action.c b/src/action.c index 9a4b0b80..de8c3374 100644 --- a/src/action.c +++ b/src/action.c @@ -34,6 +34,14 @@ lf_ret_t Action_schedule(Action *self, interval_t offset, const void *value) { env->enter_critical_section(env); + // Dont accept events before we have started + // TODO: Do we instead need some flag to signal that we have started? + if (sched->start_time == NEVER) { + LF_ERR(TRIG, "Action %p cannot schedule events before start tag", self); + env->leave_critical_section(env); + return LF_ERR; + } + ret = self->super.payload_pool->allocate(self->super.payload_pool, &payload); if (ret != LF_OK) { return ret; diff --git a/src/federated.c b/src/federated.c index 724575c9..63810b16 100644 --- a/src/federated.c +++ b/src/federated.c @@ -3,9 +3,6 @@ #include "reactor-uc/logging.h" #include "reactor-uc/platform.h" -#undef MIN -#define MIN(x, y) (((x) < (y)) ? (x) : (y)) - // TODO: Refactor so this function is available void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size); @@ -55,6 +52,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { tagged_msg->tag.time = sched->current_tag.time; tagged_msg->tag.microstep = sched->current_tag.microstep; + assert(self->bundle->serialize_hooks[self->conn_id]); size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size, tagged_msg->payload.bytes); tagged_msg->payload.size = msg_size; @@ -158,6 +156,13 @@ void FederatedConnectionBundle_handle_tagged_msg(FederatedConnectionBundle *self EventPayloadPool *pool = &input->payload_pool; env->platform->enter_critical_section(env->platform); + // Verify that we have started executing and can actually handle it + if (sched->start_time == NEVER) { + LF_ERR(FED, "Received message before start tag. Dropping"); + env->platform->leave_critical_section(env->platform); + return; + } + tag_t base_tag = ZERO_TAG; if (input->type == PHYSICAL_CONNECTION) { base_tag.time = env->get_physical_time(env); @@ -222,16 +227,6 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, } } -lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { - memcpy(user_struct, msg_buf, MIN(msg_size, 832)); - return LF_OK; -} - -size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { - memcpy(msg_buf, user_struct, MIN(user_struct_size, 832)); - return MIN(user_struct_size, 832); -} - void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel, FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks, size_t inputs_size, FederatedOutputConnection **outputs, @@ -246,6 +241,7 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa self->serialize_hooks = serialize_hooks; // Register callback function for message received. + // TODO: This should probably not happen here. self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); } diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index a5591a5a..1af0eaf8 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -1,5 +1,5 @@ #include "reactor-uc/platform/posix/tcp_ip_channel.h" -#include "reactor-uc/encoding.h" +#include "reactor-uc/serialization.h" #include "reactor-uc/logging.h" #include @@ -20,6 +20,30 @@ #define MIN(a, b) ((a) < (b) ? (a) : (b)) +static lf_ret_t TcpIpChannel_reset_socket(TcpIpChannel *self) { + FD_ZERO(&self->set); + if (self->fd > 0) { + if (close(self->fd) < 0) { + LF_ERR(NET, "Error closing socket %d", errno); + return LF_ERR; + } + } + + if ((self->fd = socket(self->protocol_family, SOCK_STREAM, 0)) < 0) { + LF_ERR(NET, "Error opening socket %d", errno); + return LF_ERR; + } + + if (setsockopt(self->fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) { + LF_ERR(NET, "Error setting socket options %d", errno); + return LF_ERR; + } + + // Set server socket to non-blocking + fcntl(self->fd, F_SETFL, O_NONBLOCK); + + return LF_OK; +} /** * @brief If is server: Bind and Listen for connections * If is client: Do nothing @@ -52,32 +76,73 @@ static lf_ret_t TcpIpChannel_open_connection(NetworkChannel *untyped_self) { return LF_OK; } -/** - * @brief If is server: Try to accept - * If is client: Try to connect - */ -static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) { +static lf_ret_t TcpIpChannel_try_connect_server(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; - /* Server -> Accept */ - if (self->server) { - int new_socket; - struct sockaddr_in address; - socklen_t addrlen = sizeof(address); + int new_socket; + struct sockaddr_in address; + socklen_t addrlen = sizeof(address); + + new_socket = accept(self->fd, (struct sockaddr *)&address, &addrlen); + if (new_socket >= 0) { + self->client = new_socket; + FD_SET(new_socket, &self->set); + return LF_OK; + } else { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + LF_DEBUG(NET, "Accept failed. Try again. (errno=%d)", errno); + return LF_TRY_AGAIN; + } else { + LF_ERR(NET, "Accept failed. Unknown errno=%d", errno); + throw("Accept failed"); + return LF_ERR; + } + } +} - new_socket = accept(self->fd, (struct sockaddr *)&address, &addrlen); - if (new_socket >= 0) { - self->client = new_socket; - FD_SET(new_socket, &self->set); +static lf_ret_t TcpIpChannel_check_if_socket_is_writable(int fd) { + fd_set set; + FD_ZERO(&set); + FD_SET(fd, &set); + struct timeval tv = {.tv_sec = 0, .tv_usec = 1000}; + int ret = select(fd + 1, NULL, &set, NULL, &tv); + if (ret > 0) { + if (FD_ISSET(fd, &set)) { return LF_OK; + } else { + LF_DEBUG(NET, "Select: socket not writable yet"); + return LF_TRY_AGAIN; } + } else if (ret == 0) { + // Timeout + LF_DEBUG(NET, "Select timed out"); + return LF_TIMEOUT; + } else { + LF_DEBUG(NET, "Select failed with errno=%d", errno); + return LF_ERR; + } +} - return LF_COULD_NOT_CONNECT; +static lf_ret_t TcpIpChannel_check_socket_error(int fd) { + int so_error; + socklen_t len = sizeof(so_error); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0) { + return LF_ERR; } + if (so_error == 0) { + return LF_OK; + } else { + return LF_ERR; + } +} - /* Client -> Connect */ - if (!self->server) { +static lf_ret_t TcpIpChannel_try_connect_client(NetworkChannel *untyped_self) { + TcpIpChannel *self = (TcpIpChannel *)untyped_self; + lf_ret_t lf_ret; + + if (!self->client_connect_in_progress) { + // First time trying to connect struct sockaddr_in serv_addr; serv_addr.sin_family = self->protocol_family; @@ -89,11 +154,53 @@ static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) { int ret = connect(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); if (ret < 0) { - return LF_COULD_NOT_CONNECT; + if (errno == EINPROGRESS) { + self->client_connect_in_progress = true; + LF_DEBUG(NET, "Connection in progress!"); + return LF_IN_PROGRESS; + } else { + LF_ERR(NET, "Connect failed errno=%d", errno); + self->client_connect_in_progress = false; + TcpIpChannel_reset_socket(self); + return LF_TRY_AGAIN; + } + } + } else { + // Connection is in progress + lf_ret = TcpIpChannel_check_if_socket_is_writable(self->fd); + if (lf_ret == LF_OK) { + LF_DEBUG(NET, "Socket is writable"); + lf_ret = TcpIpChannel_check_socket_error(self->fd); + if (lf_ret == LF_OK) { + LF_DEBUG(NET, "Connection succeeded"); + self->client_connect_in_progress = false; + return LF_OK; + } else { + self->client_connect_in_progress = false; + LF_ERR(NET, "Connection failed"); + TcpIpChannel_reset_socket(self); + return LF_TRY_AGAIN; + } + } else if (lf_ret == LF_TIMEOUT) { + LF_ERR(NET, "Select timed out"); + return LF_IN_PROGRESS; + } else { + self->client_connect_in_progress = false; + TcpIpChannel_reset_socket(self); + LF_ERR(NET, "Select failed errno=%d", errno); + return LF_TRY_AGAIN; } } + return LF_ERR; // Should never reach here +} - return LF_OK; +static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) { + TcpIpChannel *self = (TcpIpChannel *)untyped_self; + if (self->server) { + return TcpIpChannel_try_connect_server(untyped_self); + } else { + return TcpIpChannel_try_connect_client(untyped_self); + } } static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) { @@ -110,7 +217,7 @@ static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const F } // serializing protobuf into buffer - int message_size = encode_protobuf(message, self->write_buffer, TCP_IP_CHANNEL_BUFFERSIZE); + int message_size = serialize_to_protobuf(message, self->write_buffer, TCP_IP_CHANNEL_BUFFERSIZE); if (message_size < 0) { LF_ERR(NET, "Could not encode protobuf"); @@ -170,7 +277,7 @@ const FederateMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { } self->read_index += bytes_read; - bytes_left = decode_protobuf(&self->output, self->read_buffer, self->read_index); + bytes_left = deserialize_from_protobuf(&self->output, self->read_buffer, self->read_index); if (bytes_left < 0) { read_more = true; } else { @@ -224,6 +331,22 @@ static void TcpIpChannel_register_receive_callback(NetworkChannel *untyped_self, int res; LF_INFO(NET, "TCP/IP registering callback thread"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; + int fd; + if (self->server) { + fd = self->client; + } else { + fd = self->fd; + } + + // Set socket to blocking + int opts = fcntl(fd, F_GETFL); + if (opts < 0) { + throw("Could not get socket options"); + } + opts = (opts & (~O_NONBLOCK)); + if (fcntl(fd, F_SETFL, opts) < 0) { + throw("Could not set socket to blocking"); + } self->receive_callback = receive_callback; self->federated_connection = conn; @@ -280,19 +403,6 @@ static void TcpIpChannel_free(NetworkChannel *untyped_self) { } void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family, bool server) { - FD_ZERO(&self->set); - - if ((self->fd = socket(protocol_family, SOCK_STREAM, 0)) < 0) { - throw("Error creating socket"); - } - if (setsockopt(self->fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) { - throw("Could not set SO_REUSEADDR"); - } - - // Set server socket to non-blocking - if (server) { - fcntl(self->fd, F_SETFL, O_NONBLOCK); - } self->server = server; self->terminate = true; @@ -301,6 +411,8 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->port = port; self->read_index = 0; self->client = 0; + self->fd = 0; + self->client_connect_in_progress = false; self->super.open_connection = TcpIpChannel_open_connection; self->super.try_connect = TcpIpChannel_try_connect; @@ -310,6 +422,7 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->super.free = TcpIpChannel_free; self->receive_callback = NULL; self->federated_connection = NULL; - self->receive_thread = 0; + + TcpIpChannel_reset_socket(self); } \ No newline at end of file diff --git a/src/serialization.c b/src/serialization.c new file mode 100644 index 00000000..98007954 --- /dev/null +++ b/src/serialization.c @@ -0,0 +1,41 @@ +#include "reactor-uc/serialization.h" + +#include +#include + +#ifdef MIN +#undef MIN +#endif +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) + +int serialize_to_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size) { + // turing write buffer into pb_ostream buffer + pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size); + + // serializing protobuf into buffer + if (!pb_encode(&stream_out, FederateMessage_fields, message)) { + return -1; + } + + return (int)stream_out.bytes_written; +} + +int deserialize_from_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size) { + pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size); + + if (!pb_decode(&stream_in, FederateMessage_fields, message)) { + return -1; + } + + return (int)stream_in.bytes_left; +} + +lf_ret_t deserialize_payload_default(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { + memcpy(user_struct, msg_buf, MIN(msg_size, 832)); // TODO: 832 is a magic number + return LF_OK; +} + +size_t serialize_payload_default(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + memcpy(msg_buf, user_struct, MIN(user_struct_size, 832)); // TODO: 832 is a magic number + return MIN(user_struct_size, 832); // TODO: 832 is a magic number +} diff --git a/test/unit/nanopb_test.c b/test/unit/nanopb_test.c index 1d646bd7..adbef5db 100644 --- a/test/unit/nanopb_test.c +++ b/test/unit/nanopb_test.c @@ -3,7 +3,7 @@ #include "unity.h" #include "proto/message.pb.h" -#include "reactor-uc/encoding.h" +#include "reactor-uc/serialization.h" #define BUFFER_SIZE 1024 #define MSG_ID 42 @@ -28,10 +28,10 @@ void test_nanopb() { original_message->payload.size = sizeof("Hello World1234"); message = buffer; - message_size = encode_protobuf(&_original_msg, buffer, BUFFER_SIZE); + message_size = serialize_to_protobuf(&_original_msg, buffer, BUFFER_SIZE); TEST_ASSERT_TRUE(message_size > 0); - int remaining_bytes = decode_protobuf(&_deserialized_msg, message, message_size); + int remaining_bytes = deserialize_from_protobuf(&_deserialized_msg, message, message_size); TEST_ASSERT_TRUE(remaining_bytes >= 0);