Skip to content

Commit

Permalink
Add RIOT federated CoAP example and fix CoAP channel
Browse files Browse the repository at this point in the history
  • Loading branch information
LasseRosenow committed Dec 9, 2024
1 parent e0e4121 commit ad40dcc
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 0 deletions.
29 changes: 29 additions & 0 deletions examples/riot/coap_federated/receiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# name of your application
APPLICATION = lf-coap-federated-receiver

# If no BOARD is found in the environment, use this default:
BOARD ?= native

# Comment this out to disable code in RIOT that does safety checking
# which is not needed in a production environment but helps in the
# development process:
DEVELHELP ?= 1

# Change this to 0 show compiler invocation lines by default:
QUIET ?= 1

# Enable reactor-uc features
# CFLAGS += -DNETWORK_CHANNEL_TCP_POSIX
CFLAGS += -DNETWORK_CHANNEL_COAP_RIOT
CFLAGS += -DEVENT_QUEUE_SIZE=32
CFLAGS += -DREACTION_QUEUE_SIZE=32

CFLAGS += -DTHREAD_STACKSIZE_DEFAULT=10000
CFLAGS += -DTHREAD_STACKSIZE_MAIN=10000
CFLAGS += -DISR_STACKSIZE=10000

# Configure CoAP retransmission timeout
CFLAGS+= -DCONFIG_GCOAP_NO_RETRANS_BACKOFF=1
CFLAGS+= -DCONFIG_COAP_ACK_TIMEOUT_MS=400

include $(CURDIR)/../../../../make/riot/riot.mk
111 changes: 111 additions & 0 deletions examples/riot/coap_federated/receiver/main.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/platform/riot/coap_udp_ip_channel.h"

#define REMOTE_ADDRESS "fe80::4c48:d8ff:fece:9a93"
#define REMOTE_PROTOCOL_FAMILY AF_INET6

typedef struct {
int size;
char msg[512];
} lf_msg_t;

lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
(void)msg_size;

lf_msg_t *msg = user_struct;
memcpy(&msg->size, msg_buf, sizeof(msg->size));
memcpy(msg->msg, msg_buf + sizeof(msg->size), msg->size);

return LF_OK;
}

DEFINE_REACTION_STRUCT(Receiver, r, 0);
DEFINE_REACTION_CTOR(Receiver, r, 0);
DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, lf_msg_t, 0);
DEFINE_INPUT_CTOR(Receiver, in, 1, 0, lf_msg_t, 0);

typedef struct {
Reactor super;
REACTION_INSTANCE(Receiver, r);
PORT_INSTANCE(Receiver, in, 1);
int cnt;
REACTOR_BOOKKEEPING_INSTANCES(1, 1, 0);
} Receiver;

DEFINE_REACTION_BODY(Receiver, r) {
SCOPE_SELF(Receiver);
SCOPE_ENV();
SCOPE_PORT(Receiver, in);
printf("Input triggered @ %" PRId64 " with %s size %d\n", env->get_elapsed_logical_time(env), in->value.msg,
in->value.size);
}

REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_external) {
REACTOR_CTOR_PREAMBLE();
REACTOR_CTOR(Receiver);
INITIALIZE_REACTION(Receiver, r);
INITIALIZE_INPUT(Receiver, in, 1, in_external);

// Register reaction as an effect of in
PORT_REGISTER_EFFECT(self->in, self->r, 1);
}

DEFINE_FEDERATED_INPUT_CONNECTION(Receiver, in, lf_msg_t, 5, MSEC(100), false);

typedef struct {
FederatedConnectionBundle super;
CoapUdpIpChannel channel;
FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0)
} FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);

FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
CoapUdpIpChannel_ctor(&self->channel, parent->env, REMOTE_ADDRESS, REMOTE_PROTOCOL_FAMILY);
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_msg_t);
}

typedef struct {
Reactor super;
CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
FEDERATE_BOOKKEEPING_INSTANCES(1);
CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
} MainRecv;

REACTOR_CTOR_SIGNATURE(MainRecv) {
REACTOR_CTOR(MainRecv);
FEDERATE_CTOR_PREAMBLE();
DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
BUNDLE_REGISTER_DOWNSTREAM(Receiver, Sender, receiver, in);
}

ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1, false)

void print_ip_addresses(void) {
gnrc_netif_t *netif = gnrc_netif_iter(NULL);
char addr_str[IPV6_ADDR_MAX_STR_LEN];

while (netif) {
size_t max_addr_count = 4;
ipv6_addr_t addrs[max_addr_count];
gnrc_netif_ipv6_addrs_get(netif, addrs, max_addr_count * sizeof(ipv6_addr_t));

for (size_t i = 0; i < 2; i++) {
if (ipv6_addr_to_str(addr_str, &addrs[i], sizeof(addr_str))) {
LF_INFO(NET, "IPv6 address: %s", addr_str);
}
}

netif = gnrc_netif_iter(netif);
}
}

int main() {
print_ip_addresses();
lf_start();
return 0;
}
29 changes: 29 additions & 0 deletions examples/riot/coap_federated/sender/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# name of your application
APPLICATION = lf-coap-federated-sender

# If no BOARD is found in the environment, use this default:
BOARD ?= native

# Comment this out to disable code in RIOT that does safety checking
# which is not needed in a production environment but helps in the
# development process:
DEVELHELP ?= 1

# Change this to 0 show compiler invocation lines by default:
QUIET ?= 1

# Enable reactor-uc features
# CFLAGS += -DNETWORK_CHANNEL_TCP_POSIX
CFLAGS += -DNETWORK_CHANNEL_COAP_RIOT
CFLAGS += -DEVENT_QUEUE_SIZE=32
CFLAGS += -DREACTION_QUEUE_SIZE=32

CFLAGS += -DTHREAD_STACKSIZE_DEFAULT=10000
CFLAGS += -DTHREAD_STACKSIZE_MAIN=10000
CFLAGS += -DISR_STACKSIZE=10000

# Configure CoAP retransmission timeout
CFLAGS+= -DCONFIG_GCOAP_NO_RETRANS_BACKOFF=1
CFLAGS+= -DCONFIG_COAP_ACK_TIMEOUT_MS=400

include $(CURDIR)/../../../../make/riot/riot.mk
120 changes: 120 additions & 0 deletions examples/riot/coap_federated/sender/main.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/platform/riot/coap_udp_ip_channel.h"

#define REMOTE_ADDRESS "fe80::2882:1bff:fe2d:1362"
#define REMOTE_PROTOCOL_FAMILY AF_INET6

typedef struct {
int size;
char msg[512];
} lf_msg_t;

size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
(void)user_struct_size;
const lf_msg_t *msg = user_struct;

memcpy(msg_buf, &msg->size, sizeof(msg->size));
memcpy(msg_buf + sizeof(msg->size), msg->msg, msg->size);

return sizeof(msg->size) + msg->size;
}

DEFINE_TIMER_STRUCT(Sender, t, 1, 0)
DEFINE_TIMER_CTOR(Sender, t, 1, 0)
DEFINE_REACTION_STRUCT(Sender, r, 1)
DEFINE_REACTION_CTOR(Sender, r, 0)
DEFINE_OUTPUT_STRUCT(Sender, out, 1, lf_msg_t)
DEFINE_OUTPUT_CTOR(Sender, out, 1)

typedef struct {
Reactor super;
TIMER_INSTANCE(Sender, t);
REACTION_INSTANCE(Sender, r);
PORT_INSTANCE(Sender, out, 1);
REACTOR_BOOKKEEPING_INSTANCES(1, 2, 0);
} Sender;

DEFINE_REACTION_BODY(Sender, r) {
SCOPE_SELF(Sender);
SCOPE_ENV();
SCOPE_PORT(Sender, out);

printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env));
lf_msg_t val;
strcpy(val.msg, "Hello From Sender");
val.size = sizeof("Hello From Sender");
lf_set(out, val);
}

REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Sender, OutputExternalCtorArgs *out_external) {
REACTOR_CTOR_PREAMBLE();
REACTOR_CTOR(Sender);
INITIALIZE_REACTION(Sender, r);
INITIALIZE_TIMER(Sender, t, MSEC(0), SEC(1));
INITIALIZE_OUTPUT(Sender, out, 1, out_external);

TIMER_REGISTER_EFFECT(self->t, self->r);
PORT_REGISTER_SOURCE(self->out, self->r, 1);
}

DEFINE_FEDERATED_OUTPUT_CONNECTION(Sender, out, lf_msg_t, 1)

typedef struct {
FederatedConnectionBundle super;
CoapUdpIpChannel channel;
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1);
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver);

FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
CoapUdpIpChannel_ctor(&self->channel, parent->env, REMOTE_ADDRESS, REMOTE_PROTOCOL_FAMILY);
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t);
}

// Reactor main
typedef struct {
Reactor super;
CHILD_REACTOR_INSTANCE(Sender, sender, 1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver);
FEDERATE_BOOKKEEPING_INSTANCES(1);
CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1);
CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);
} MainSender;

REACTOR_CTOR_SIGNATURE(MainSender) {
REACTOR_CTOR(MainSender);
FEDERATE_CTOR_PREAMBLE();
DEFINE_CHILD_OUTPUT_ARGS(sender, out, 1, 1);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver);
BUNDLE_REGISTER_UPSTREAM(Sender, Receiver, sender, out);
}
ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1, true)

void print_ip_addresses(void) {
gnrc_netif_t *netif = gnrc_netif_iter(NULL);
char addr_str[IPV6_ADDR_MAX_STR_LEN];

while (netif) {
size_t max_addr_count = 4;
ipv6_addr_t addrs[max_addr_count];
gnrc_netif_ipv6_addrs_get(netif, addrs, max_addr_count * sizeof(ipv6_addr_t));

for (size_t i = 0; i < 2; i++) {
if (ipv6_addr_to_str(addr_str, &addrs[i], sizeof(addr_str))) {
LF_INFO(NET, "IPv6 address: %s", addr_str);
}
}

netif = gnrc_netif_iter(netif);
}
}

int main() {
print_ip_addresses();
lf_start();
return 0;
}

0 comments on commit ad40dcc

Please sign in to comment.