Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of non-blocking try-connect #106

Merged
merged 7 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions examples/zephyr/basic_federated/federated_receiver1/src/receiver.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/serialization.h"
#include <zephyr/net/net_ip.h>

#include <zephyr/drivers/gpio.h>
Expand Down Expand Up @@ -68,12 +69,25 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
NetworkChannel *chan = &self->chan.super;
chan->open_connection(chan);

lf_ret_t ret;
LF_DEBUG(ENV, "Recv: Connecting");
do {
lf_ret_t ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
LF_DEBUG(ENV, "Recv: Connected");
self->deserialize_hooks[0] = deserialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
Expand All @@ -88,8 +102,6 @@ typedef struct {
} MainRecv;

void MainRecv_ctor(MainRecv *self, Environment *env) {
env->wait_until(env, env->get_physical_time(env) +
MSEC(100)); // FIXME: This is a hack until we have proper connection setup
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

Expand Down
22 changes: 17 additions & 5 deletions examples/zephyr/basic_federated/federated_receiver2/src/receiver.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/serialization.h"
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include <zephyr/net/net_ip.h>
Expand Down Expand Up @@ -68,12 +69,25 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
NetworkChannel *chan = &self->chan.super;
chan->open_connection(chan);

lf_ret_t ret;
LF_DEBUG(ENV, "Recv: Connecting");
do {
lf_ret_t ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
LF_DEBUG(ENV, "Recv: Connected");
self->deserialize_hooks[0] = deserialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
Expand All @@ -88,8 +102,6 @@ typedef struct {
} MainRecv;

void MainRecv_ctor(MainRecv *self, Environment *env) {
env->wait_until(env, env->get_physical_time(env) +
MSEC(400)); // FIXME: This is a hack until we have proper connection setup
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

Expand Down
41 changes: 34 additions & 7 deletions examples/zephyr/basic_federated/federated_sender/src/sender.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/serialization.h"
#include <zephyr/net/net_ip.h>

#include <zephyr/drivers/gpio.h>
Expand Down Expand Up @@ -127,11 +128,25 @@ void SenderRecv1Bundle_ctor(SenderRecv1Bundle *self, Reactor *parent) {
printf("Sender: Bound 1\n");

// accept one connection
do {
ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
validate(ret == LF_OK);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
printf("Sender: Accepted 1\n");
self->serialize_hooks[0] = serialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
Expand All @@ -148,12 +163,24 @@ void SenderRecv2Bundle_ctor(SenderRecv2Bundle *self, Reactor *parent) {
printf("Sender: Bound 2\n");

// accept one connection
do {
ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
} while (ret != LF_OK);
validate(ret == LF_OK);
printf("Sender: Accepted 2\n");
switch (ret) {
case LF_OK:
break;
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}

printf("Sender: Accepted 2\n");
self->serialize_hooks[0] = serialize_payload_default;
FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
}
Expand Down
30 changes: 0 additions & 30 deletions include/reactor-uc/encoding.h

This file was deleted.

5 changes: 4 additions & 1 deletion include/reactor-uc/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ typedef enum {
LF_OUT_OF_BOUNDS,
LF_NO_MEM,
LF_COULD_NOT_CONNECT,
LF_NETWORK_SETUP_FAILED
LF_NETWORK_SETUP_FAILED,
LF_TIMEOUT,
LF_TRY_AGAIN,
LF_IN_PROGRESS
} lf_ret_t;

/**
Expand Down
5 changes: 5 additions & 0 deletions include/reactor-uc/network_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ typedef struct NetworkChannel NetworkChannel;
struct NetworkChannel {
size_t dest_channel_id; // So that we can "address" one of several NetworkChannel's at the other end.
lf_ret_t (*open_connection)(NetworkChannel *self);

/** Try to connect to corresponding NetworkChannel on another federate.
* @return LF_OK if connection is established, LF_IN_PROGRESS if connection is in progress, LF_TRY_AGAIN if connection
* failed and should be retried, LF_ERR if connection failed and should not be retried.
*/
lf_ret_t (*try_connect)(NetworkChannel *self);
void (*close_connection)(NetworkChannel *self);
lf_ret_t (*send_blocking)(NetworkChannel *self, const FederateMessage *message);
Expand Down
1 change: 1 addition & 0 deletions include/reactor-uc/platform/posix/tcp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TcpIpChannel {

int fd;
int client;
bool client_connect_in_progress;

const char *host;
unsigned short port;
Expand Down
1 change: 1 addition & 0 deletions include/reactor-uc/reactor-uc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "reactor-uc/macros.h"
#include "reactor-uc/platform.h"
#include "reactor-uc/port.h"
#include "reactor-uc/serialization.h"
#include "reactor-uc/reaction.h"
#include "reactor-uc/reactor.h"
#include "reactor-uc/tag.h"
Expand Down
13 changes: 13 additions & 0 deletions include/reactor-uc/serialization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#ifndef REACTOR_UC_SERIALIZATION_H
#define REACTOR_UC_SERIALIZATION_H

#include "proto/message.pb.h"
#include "reactor-uc/error.h"

int serialize_to_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size);
int deserialize_from_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size);

lf_ret_t deserialize_payload_default(void *user_struct, const unsigned char *msg_buf, size_t msg_size);

size_t serialize_payload_default(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf);
#endif // REACTOR_UC_SERIALIZATION_H
8 changes: 8 additions & 0 deletions src/action.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ lf_ret_t Action_schedule(Action *self, interval_t offset, const void *value) {

env->enter_critical_section(env);

// Dont accept events before we have started
// TODO: Do we instead need some flag to signal that we have started?
if (sched->start_time == NEVER) {
LF_ERR(TRIG, "Action %p cannot schedule events before start tag", self);
env->leave_critical_section(env);
return LF_ERR;
}

ret = self->super.payload_pool->allocate(self->super.payload_pool, &payload);
if (ret != LF_OK) {
return ret;
Expand Down
22 changes: 9 additions & 13 deletions src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/platform.h"

#undef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))

// TODO: Refactor so this function is available
void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size);

Expand Down Expand Up @@ -55,6 +52,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) {
tagged_msg->tag.time = sched->current_tag.time;
tagged_msg->tag.microstep = sched->current_tag.microstep;

assert(self->bundle->serialize_hooks[self->conn_id]);
size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size,
tagged_msg->payload.bytes);
tagged_msg->payload.size = msg_size;
Expand Down Expand Up @@ -158,6 +156,13 @@ void FederatedConnectionBundle_handle_tagged_msg(FederatedConnectionBundle *self
EventPayloadPool *pool = &input->payload_pool;
env->platform->enter_critical_section(env->platform);

// Verify that we have started executing and can actually handle it
if (sched->start_time == NEVER) {
LF_ERR(FED, "Received message before start tag. Dropping");
env->platform->leave_critical_section(env->platform);
return;
}

tag_t base_tag = ZERO_TAG;
if (input->type == PHYSICAL_CONNECTION) {
base_tag.time = env->get_physical_time(env);
Expand Down Expand Up @@ -222,16 +227,6 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
}
}

lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
memcpy(user_struct, msg_buf, MIN(msg_size, 832));
return LF_OK;
}

size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
memcpy(msg_buf, user_struct, MIN(user_struct_size, 832));
return MIN(user_struct_size, 832);
}

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
size_t inputs_size, FederatedOutputConnection **outputs,
Expand All @@ -246,6 +241,7 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa
self->serialize_hooks = serialize_hooks;

// Register callback function for message received.
// TODO: This should probably not happen here.
self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self);
}

Expand Down
Loading
Loading