diff --git a/examples/posix/federated/receiver.c b/examples/posix/federated/receiver.c index a9298d1c..5dc8ab7c 100644 --- a/examples/posix/federated/receiver.c +++ b/examples/posix/federated/receiver.c @@ -14,7 +14,7 @@ typedef struct { lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { msg_t *msg = user_struct; memcpy(&msg->size, msg_buf, sizeof(msg->size)); - memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size)); + memcpy(msg->msg, msg_buf + sizeof(msg->size), msg->size); return LF_OK; } @@ -36,7 +36,8 @@ DEFINE_REACTION_BODY(Receiver, 0) { Receiver *self = (Receiver *)_self->parent; Environment *env = self->super.env; In *inp = &self->inp; - printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), inp->value.msg); + printf("Input triggered @ %" PRId64 " with %s size %d\n", env->get_elapsed_logical_time(env), inp->value.msg, + inp->value.size); } DEFINE_REACTION_CTOR(Receiver, 0) diff --git a/examples/posix/federated/sender.c b/examples/posix/federated/sender.c index 523d9cac..fc7dfcf1 100644 --- a/examples/posix/federated/sender.c +++ b/examples/posix/federated/sender.c @@ -16,7 +16,7 @@ size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigne const msg_t *msg = user_struct; memcpy(msg_buf, &msg->size, sizeof(msg->size)); - memcpy(msg_buf + sizeof(msg->size), msg->msg, sizeof(msg->size)); + memcpy(msg_buf + sizeof(msg->size), msg->msg, msg->size); return sizeof(msg->size) + msg->size; } @@ -62,39 +62,6 @@ void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **c OUTPUT_REGISTER_SOURCE(self->out, self->reaction); } -DEFINE_REACTION_STRUCT(Receiver, 0, 1) -DEFINE_INPUT_PORT_STRUCT(In, 1, msg_t, 0) -DEFINE_INPUT_PORT_CTOR(In, 1, msg_t, 0) - -typedef struct { - Reactor super; - Receiver_Reaction0 reaction; - In inp; - int cnt; - Reaction *_reactions[1]; - Trigger *_triggers[1]; -} Receiver; - -DEFINE_REACTION_BODY(Receiver, 0) { - Receiver *self = (Receiver *)_self->parent; - Environment *env = self->super.env; - In *inp = &self->inp; - printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg, - inp->value.size); -} -DEFINE_REACTION_CTOR(Receiver, 0) - -void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) { - self->_reactions[0] = (Reaction *)&self->reaction; - self->_triggers[0] = (Trigger *)&self->inp; - Reactor_ctor(&self->super, "Receiver", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); - Receiver_Reaction0_ctor(&self->reaction, &self->super); - In_ctor(&self->inp, &self->super); - - // Register reaction as an effect of in - INPUT_REGISTER_EFFECT(self->inp, self->reaction); -} - DEFINE_FEDERATED_OUTPUT_CONNECTION(ConnSender, msg_t, 1) typedef struct { diff --git a/src/federated.c b/src/federated.c index 349a4d52..b9927113 100644 --- a/src/federated.c +++ b/src/federated.c @@ -146,8 +146,8 @@ void FederatedConnectionBundle_handle_start_tag_signal(FederatedConnectionBundle // a TaggedMessage available. void FederatedConnectionBundle_handle_tagged_msg(FederatedConnectionBundle *self, const FederateMessage *_msg) { const TaggedMessage *msg = &_msg->message.tagged_message; - LF_DEBUG(FED, "Callback on FedConnBundle %p for message with tag=%" PRId64 ":%" PRIu32, self, msg->tag.time, - msg->tag.microstep); + LF_DEBUG(FED, "Callback on FedConnBundle %p for message of size=%u with tag=%" PRId64 ":%" PRIu32, self, + msg->payload.size, msg->tag.time, msg->tag.microstep); assert(((size_t)msg->conn_id) < self->inputs_size); lf_ret_t ret; FederatedInputConnection *input = self->inputs[msg->conn_id]; diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 39526b0a..afa3c203 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -342,6 +342,8 @@ const FederateMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { if (bytes_read < 0) { LF_ERR(NET, "[%s] Error recv from socket %d", self->server ? "server" : "client", errno); continue; + } else if (bytes_read == 0) { + continue; } self->read_index += bytes_read;