Skip to content

Commit

Permalink
Improve handling of non-blocking try-connect (#106)
Browse files Browse the repository at this point in the history
* refactor encoding into serialization

* Improve handling of non-blocking connect

* Dont accept tagged messages before start tag is settled

* Dont accept events before start tag is settled

* Port Zephyr example to new TcpIpChannel API

* Address feedback from Lasse
  • Loading branch information
erlingrj authored Oct 29, 2024
1 parent 8e35cc2 commit 6a64061
Show file tree
Hide file tree
Showing 14 changed files with 302 additions and 100 deletions.
22 changes: 17 additions & 5 deletions examples/zephyr/basic_federated/federated_receiver1/src/receiver.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/serialization.h"
#include <zephyr/net/net_ip.h>

#include <zephyr/drivers/gpio.h>
Expand Down Expand Up @@ -68,12 +69,25 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
NetworkChannel *chan = &self->chan.super;
chan->open_connection(chan);

lf_ret_t ret;
LF_DEBUG(ENV, "Recv: Connecting");
do {
lf_ret_t ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
LF_DEBUG(ENV, "Recv: Connected");
self->deserialize_hooks[0] = deserialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
Expand All @@ -88,8 +102,6 @@ typedef struct {
} MainRecv;

void MainRecv_ctor(MainRecv *self, Environment *env) {
env->wait_until(env, env->get_physical_time(env) +
MSEC(100)); // FIXME: This is a hack until we have proper connection setup
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

Expand Down
22 changes: 17 additions & 5 deletions examples/zephyr/basic_federated/federated_receiver2/src/receiver.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/serialization.h"
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include <zephyr/net/net_ip.h>
Expand Down Expand Up @@ -68,12 +69,25 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
NetworkChannel *chan = &self->chan.super;
chan->open_connection(chan);

lf_ret_t ret;
LF_DEBUG(ENV, "Recv: Connecting");
do {
lf_ret_t ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
LF_DEBUG(ENV, "Recv: Connected");
self->deserialize_hooks[0] = deserialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
Expand All @@ -88,8 +102,6 @@ typedef struct {
} MainRecv;

void MainRecv_ctor(MainRecv *self, Environment *env) {
env->wait_until(env, env->get_physical_time(env) +
MSEC(400)); // FIXME: This is a hack until we have proper connection setup
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

Expand Down
41 changes: 34 additions & 7 deletions examples/zephyr/basic_federated/federated_sender/src/sender.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/serialization.h"
#include <zephyr/net/net_ip.h>

#include <zephyr/drivers/gpio.h>
Expand Down Expand Up @@ -127,11 +128,25 @@ void SenderRecv1Bundle_ctor(SenderRecv1Bundle *self, Reactor *parent) {
printf("Sender: Bound 1\n");

// accept one connection
do {
ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
validate(ret == LF_OK);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
printf("Sender: Accepted 1\n");
self->serialize_hooks[0] = serialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
Expand All @@ -148,12 +163,24 @@ void SenderRecv2Bundle_ctor(SenderRecv2Bundle *self, Reactor *parent) {
printf("Sender: Bound 2\n");

// accept one connection
do {
ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
validate(ret == LF_OK);
printf("Sender: Accepted 2\n");
switch (ret) {
case LF_OK:
break;
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}

printf("Sender: Accepted 2\n");
self->serialize_hooks[0] = serialize_payload_default;
FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
}
Expand Down
30 changes: 0 additions & 30 deletions include/reactor-uc/encoding.h

This file was deleted.

5 changes: 4 additions & 1 deletion include/reactor-uc/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ typedef enum {
LF_OUT_OF_BOUNDS,
LF_NO_MEM,
LF_COULD_NOT_CONNECT,
LF_NETWORK_SETUP_FAILED
LF_NETWORK_SETUP_FAILED,
LF_TIMEOUT,
LF_TRY_AGAIN,
LF_IN_PROGRESS
} lf_ret_t;

/**
Expand Down
5 changes: 5 additions & 0 deletions include/reactor-uc/network_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ typedef struct NetworkChannel NetworkChannel;
struct NetworkChannel {
size_t dest_channel_id; // So that we can "address" one of several NetworkChannel's at the other end.
lf_ret_t (*open_connection)(NetworkChannel *self);

/** Try to connect to corresponding NetworkChannel on another federate.
* @return LF_OK if connection is established, LF_IN_PROGRESS if connection is in progress, LF_TRY_AGAIN if connection
* failed and should be retried, LF_ERR if connection failed and should not be retried.
*/
lf_ret_t (*try_connect)(NetworkChannel *self);
void (*close_connection)(NetworkChannel *self);
lf_ret_t (*send_blocking)(NetworkChannel *self, const FederateMessage *message);
Expand Down
1 change: 1 addition & 0 deletions include/reactor-uc/platform/posix/tcp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TcpIpChannel {

int fd;
int client;
bool client_connect_in_progress;

const char *host;
unsigned short port;
Expand Down
1 change: 1 addition & 0 deletions include/reactor-uc/reactor-uc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "reactor-uc/macros.h"
#include "reactor-uc/platform.h"
#include "reactor-uc/port.h"
#include "reactor-uc/serialization.h"
#include "reactor-uc/reaction.h"
#include "reactor-uc/reactor.h"
#include "reactor-uc/tag.h"
Expand Down
13 changes: 13 additions & 0 deletions include/reactor-uc/serialization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#ifndef REACTOR_UC_SERIALIZATION_H
#define REACTOR_UC_SERIALIZATION_H

#include "proto/message.pb.h"
#include "reactor-uc/error.h"

int serialize_to_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size);
int deserialize_from_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size);

lf_ret_t deserialize_payload_default(void *user_struct, const unsigned char *msg_buf, size_t msg_size);

size_t serialize_payload_default(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf);
#endif // REACTOR_UC_SERIALIZATION_H
8 changes: 8 additions & 0 deletions src/action.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ lf_ret_t Action_schedule(Action *self, interval_t offset, const void *value) {

env->enter_critical_section(env);

// Dont accept events before we have started
// TODO: Do we instead need some flag to signal that we have started?
if (sched->start_time == NEVER) {
LF_ERR(TRIG, "Action %p cannot schedule events before start tag", self);
env->leave_critical_section(env);
return LF_ERR;
}

ret = self->super.payload_pool->allocate(self->super.payload_pool, &payload);
if (ret != LF_OK) {
return ret;
Expand Down
22 changes: 9 additions & 13 deletions src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/platform.h"

#undef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))

// TODO: Refactor so this function is available
void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size);

Expand Down Expand Up @@ -55,6 +52,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) {
tagged_msg->tag.time = sched->current_tag.time;
tagged_msg->tag.microstep = sched->current_tag.microstep;

assert(self->bundle->serialize_hooks[self->conn_id]);
size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size,
tagged_msg->payload.bytes);
tagged_msg->payload.size = msg_size;
Expand Down Expand Up @@ -158,6 +156,13 @@ void FederatedConnectionBundle_handle_tagged_msg(FederatedConnectionBundle *self
EventPayloadPool *pool = &input->payload_pool;
env->platform->enter_critical_section(env->platform);

// Verify that we have started executing and can actually handle it
if (sched->start_time == NEVER) {
LF_ERR(FED, "Received message before start tag. Dropping");
env->platform->leave_critical_section(env->platform);
return;
}

tag_t base_tag = ZERO_TAG;
if (input->type == PHYSICAL_CONNECTION) {
base_tag.time = env->get_physical_time(env);
Expand Down Expand Up @@ -222,16 +227,6 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
}
}

lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
memcpy(user_struct, msg_buf, MIN(msg_size, 832));
return LF_OK;
}

size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
memcpy(msg_buf, user_struct, MIN(user_struct_size, 832));
return MIN(user_struct_size, 832);
}

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
size_t inputs_size, FederatedOutputConnection **outputs,
Expand All @@ -246,6 +241,7 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa
self->serialize_hooks = serialize_hooks;

// Register callback function for message received.
// TODO: This should probably not happen here.
self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self);
}

Expand Down
Loading

0 comments on commit 6a64061

Please sign in to comment.