diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c2c757d8..298a3ec7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,9 @@ jobs: # - name: Setup upterm session # uses: lhotari/action-upterm@v1 - name: Format check - run: make format-check + run: | + clang-format --version + make format-check - name: Run unit tests run: make unit-test @@ -51,4 +53,4 @@ jobs: lcov-file: build/coverage.info delete-old-comments: true github-token: ${{ secrets.GITHUB_TOKEN }} - if: ${{ github.event_name == 'pull_request' }} \ No newline at end of file + if: ${{ github.event_name == 'pull_request' }} diff --git a/CMakeLists.txt b/CMakeLists.txt index 738d05e5..bfcc7679 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,6 +63,9 @@ elseif (PLATFORM STREQUAL "ZEPHYR") elseif (PLATFORM STREQUAL "PICO") add_library(reactor-uc STATIC ${SOURCES}) target_link_libraries(reactor-uc PUBLIC pico_stdlib pico_sync) + +elseif (PLATFORM STREQUAL "PATMOS") + add_library(reactor-uc STATIC ${SOURCES}) else () message(FATAL_ERROR "No valid platform specified") endif () diff --git a/README.md b/README.md index 6230526e..1a151e00 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,8 @@ hello/build/app ### Zephyr Compile and run a simple test on Zephyr. This requires a correctly configured -Zehyr environment, with West installed in a Python virtual environment which is -activated. Inspect `.github/actions/zephyr/action.yml` for an example of how to setup your Zephyr workspace. +Zehyr environment, with West installed in a Python virtual environment, is +activated. Inspect `.github/actions/zephyr/action.yml` for an example of setting up your Zephyr workspace. First a simple HelloWorld on the `native_posix` target: ```shell @@ -48,7 +48,7 @@ cd examples/zephyr/hello west build -b native_posix -p always -t run ``` -Then a simple blinky targeting NXP FRDM K64F. This will run with most boards supporting Zephyr that has a user LED. +Then a simple blinky targeting NXP FRDM K64F. This will run with most boards supporting Zephyr that have a user LED. ```shell cd examples/zephyr/blinky west build -b frdm_k64f -p always @@ -76,7 +76,7 @@ make ``` ### Lingua Franca -Reactor-uc includes a limited version of the Lingua Franca Compiler (lfc) found in `~/lfc`. In the future, the +Reactor-uc includes a limited version of the Lingua Franca Compiler (lfc) found in `lfc`. In the future, the `reactor-uc` specific code-generation will be merged back upstream. By sourcing `env.bash`, `env.fish` or `env.zsh` the Lingua Franca Compiler will be aliased by `lfcg`. @@ -93,8 +93,8 @@ test/lf/bin/HelloUc ### Project templates To start using reactor-uc in a project we recommend using one of our project templates: -- [Zephyr](https://github.com/lf-lang/lf-west-template/tree/reactor-uc) -- [Polulu 3Pi Robot](https://github.com/lf-lang/lf-3pi-template/tree/reactor-uc) +- [Zephyr](https://github.com/lf-lang/lf-zephyr-uc-template) +- [RIOT](https://github.com/lf-lang/lf-riot-uc-template/) - [Raspberry Pi Pico](https://github.com/lf-lang/lf-pico-template/tree/reactor-uc) diff --git a/examples/riot/README.md b/examples/riot/README.md new file mode 100644 index 00000000..a81fb92a --- /dev/null +++ b/examples/riot/README.md @@ -0,0 +1,96 @@ +# RIOT Examples + +This doc explains how to compile and run the various RIOT OS examples. + +## Setup RIOT Environment + +Make sure that the environment variable `RIOTBASE` points to a `RIOT` codebase. + +## Build and Run + +### Blinky + +```shell +cd blinky +make BOARD=native all term +``` + +### Hello + +```shell +cd hello +make BOARD=native all term +``` + +### CoAP Federated + +The federated example using CoAP channels needs to be run using 2 terminals. +Make sure to set the `PORT` environment variable to the correct `tap` interface such as `tap0` or `tap1` as can be seen in the code below. + +#### Preparation + +First you need to create the `tap` interfaces so that the `sender` and `receiver` application can communicate through the (linux) host. + +```shell +sudo $RIOTBASE/dist/tools/tapsetup/tapsetup +``` + +#### Get IPv6 address of receiver + +Enter the directory of the `sender` application: + +```shell +cd coap_federated/sender +``` + +Get the IP address of the `receiver` by specifying the `PORT=tap1` and `ONLY_PRINT_IP=1` environment variables: + +*If the program returns more than one IP-Address then select the one that starts with `fe80`*. + +```shell +make ONLY_PRINT_IP=1 BOARD=native PORT=tap1 all term +``` + +The resulting program will print out the IPv6 address of `tap1` and terminate. +This address must be used when starting the sender below. + + +#### Get IPv6 address of sender + +Enter the directory of the `receiver` application: + +```shell +cd coap_federated/receiver +``` + +Get the IP address of the `sender` by specifying the `PORT=tap0` and `ONLY_PRINT_IP=1` environment variables: + +*If the program returns more than one IP-Address then select the one that starts with `fe80`*. + +```shell +make ONLY_PRINT_IP=1 BOARD=native PORT=tap0 all term +``` + +The resulting program will print out the IPv6 address of `tap0` and terminate. +This address must be used when starting the receiver below. + +#### Start the applications + +##### Sender +Start the sender with `PORT=tap0`, make sure to replace `REMOTE_ADDRESS` with +the address of `tap1` that you found above. + +```shell +cd sender +make REMOTE_ADDRESS=fe80::8cc3:33ff:febb:1b3 BOARD=native PORT=tap0 all term +``` + +##### Receiver + +Start the receiver with `PORT=tap1`, make sure to replace `REMOTE_ADDRESS` with +the address of `tap0` that you found above. + +```shell +cd receiver +make REMOTE_ADDRESS=fe80::44e5:1bff:fee4:dac8 BOARD=native PORT=tap1 all term +``` diff --git a/examples/riot/coap_federated/receiver/Makefile b/examples/riot/coap_federated/receiver/Makefile new file mode 100755 index 00000000..7a6d22cb --- /dev/null +++ b/examples/riot/coap_federated/receiver/Makefile @@ -0,0 +1,45 @@ +# name of your application +APPLICATION = lf-coap-federated-receiver + +# This has to be the absolute path to the RIOT base directory: +RIOTBASE ?= $(CURDIR)/../../../../../RIOT + +# If no BOARD is found in the environment, use this default: +BOARD ?= native + +# Comment this out to disable code in RIOT that does safety checking +# which is not needed in a production environment but helps in the +# development process: +DEVELHELP ?= 1 + +# Change this to 0 show compiler invocation lines by default: +QUIET ?= 1 + +# Enable reactor-uc features +CFLAGS += -DNETWORK_CHANNEL_COAP_RIOT +REACTION_QUEUE_SIZE = 32 +EVENT_QUEUE_SIZE = 32 + +CFLAGS += -DTHREAD_STACKSIZE_DEFAULT=10000 +CFLAGS += -DTHREAD_STACKSIZE_MAIN=10000 +CFLAGS += -DISR_STACKSIZE=10000 + +# Configure CoAP retransmission timeout +CFLAGS += -DCONFIG_GCOAP_NO_RETRANS_BACKOFF=1 +CFLAGS += -DCONFIG_COAP_ACK_TIMEOUT_MS=400 +CFLAGS += -DCONFIG_COAP_MAX_RETRANSMIT=4 + +# Check if ONLY_PRINT_IP is defined +# If ONLY_PRINT_IP is defined the REMOTE_ADDRESS is not needed +ifdef ONLY_PRINT_IP + # ONLY_PRINT_IP is defined => Set CFLAGS for it + CFLAGS += -DONLY_PRINT_IP=$(ONLY_PRINT_IP) +else ifdef REMOTE_ADDRESS + # REMOTE_ADDRESS is defined => Set CFLAGS for it + CFLAGS += -DREMOTE_ADDRESS=\"$(REMOTE_ADDRESS)\" +else + # Neither is defined + $(error Either define REMOTE_ADDRESS or set ONLY_PRINT_IP=1 to print the IP-Address of this device.) +endif + +include $(CURDIR)/../../../../make/riot/riot.mk diff --git a/examples/riot/coap_federated/receiver/main.c b/examples/riot/coap_federated/receiver/main.c new file mode 100755 index 00000000..9c50219c --- /dev/null +++ b/examples/riot/coap_federated/receiver/main.c @@ -0,0 +1,117 @@ +#include "reactor-uc/platform/riot/coap_udp_ip_channel.h" +#include "reactor-uc/reactor-uc.h" + +#ifndef REMOTE_ADDRESS +#define REMOTE_ADDRESS "fe80::44e5:1bff:fee4:dac8" +#endif + +#define REMOTE_PROTOCOL_FAMILY AF_INET6 + +typedef struct { + int size; + char msg[512]; +} lf_msg_t; + +lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { + (void)msg_size; + + lf_msg_t *msg = user_struct; + memcpy(&msg->size, msg_buf, sizeof(msg->size)); + memcpy(msg->msg, msg_buf + sizeof(msg->size), msg->size); + + return LF_OK; +} + +LF_DEFINE_REACTION_STRUCT(Receiver, r, 0) +LF_DEFINE_REACTION_CTOR(Receiver, r, 0) +LF_DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, lf_msg_t, 0) +LF_DEFINE_INPUT_CTOR(Receiver, in, 1, 0, lf_msg_t, 0) + +typedef struct { + Reactor super; + LF_REACTION_INSTANCE(Receiver, r); + LF_PORT_INSTANCE(Receiver, in, 1); + int cnt; + LF_REACTOR_BOOKKEEPING_INSTANCES(1, 1, 0); +} Receiver; + +LF_DEFINE_REACTION_BODY(Receiver, r) { + LF_SCOPE_SELF(Receiver); + LF_SCOPE_ENV(); + LF_SCOPE_PORT(Receiver, in); + printf("Input triggered @ %" PRId64 " with %s size %d\n", env->get_elapsed_logical_time(env), in->value.msg, + in->value.size); +} + +LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_external) { + LF_REACTOR_CTOR_PREAMBLE(); + LF_REACTOR_CTOR(Receiver); + LF_INITIALIZE_REACTION(Receiver, r); + LF_INITIALIZE_INPUT(Receiver, in, 1, in_external); + + // Register reaction as an effect of in + LF_PORT_REGISTER_EFFECT(self->in, self->r, 1); +} + +LF_DEFINE_FEDERATED_INPUT_CONNECTION(Receiver, in, lf_msg_t, 5, MSEC(100), false) + +typedef struct { + FederatedConnectionBundle super; + CoapUdpIpChannel channel; + LF_FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in); + LF_FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0) +} LF_FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender); + +LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) { + LF_FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE(); + CoapUdpIpChannel_ctor(&self->channel, parent->env, REMOTE_ADDRESS, REMOTE_PROTOCOL_FAMILY); + LF_FEDERATED_CONNECTION_BUNDLE_CALL_CTOR(); + LF_INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_msg_t); +} + +typedef struct { + Reactor super; + LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1); + LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender); + LF_FEDERATE_BOOKKEEPING_INSTANCES(1); + LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0); +} MainRecv; + +LF_REACTOR_CTOR_SIGNATURE(MainRecv) { + LF_REACTOR_CTOR(MainRecv); + LF_FEDERATE_CTOR_PREAMBLE(); + LF_DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1); + LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]); + LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender); + LF_BUNDLE_REGISTER_DOWNSTREAM(Receiver, Sender, receiver, in); +} + +LF_ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1, false) + +void print_ip_addresses(void) { + gnrc_netif_t *netif = gnrc_netif_iter(NULL); + char addr_str[IPV6_ADDR_MAX_STR_LEN]; + + while (netif) { + size_t max_addr_count = 4; + ipv6_addr_t addrs[max_addr_count]; + gnrc_netif_ipv6_addrs_get(netif, addrs, max_addr_count * sizeof(ipv6_addr_t)); + + for (size_t i = 0; i < 2; i++) { + if (ipv6_addr_to_str(addr_str, &addrs[i], sizeof(addr_str))) { + LF_INFO(NET, "IPv6 address: %s", addr_str); + } + } + + netif = gnrc_netif_iter(netif); + } +} + +int main() { +#ifdef ONLY_PRINT_IP + print_ip_addresses(); +#else + lf_start(); +#endif + return 0; +} diff --git a/examples/riot/coap_federated/sender/Makefile b/examples/riot/coap_federated/sender/Makefile new file mode 100755 index 00000000..ad03fccd --- /dev/null +++ b/examples/riot/coap_federated/sender/Makefile @@ -0,0 +1,45 @@ +# name of your application +APPLICATION = lf-coap-federated-sender + +# This has to be the absolute path to the RIOT base directory: +RIOTBASE ?= $(CURDIR)/../../../../../RIOT + +# If no BOARD is found in the environment, use this default: +BOARD ?= native + +# Comment this out to disable code in RIOT that does safety checking +# which is not needed in a production environment but helps in the +# development process: +DEVELHELP ?= 1 + +# Change this to 0 show compiler invocation lines by default: +QUIET ?= 1 + +# Enable reactor-uc features +CFLAGS += -DNETWORK_CHANNEL_COAP_RIOT +REACTION_QUEUE_SIZE = 32 +EVENT_QUEUE_SIZE = 32 + +CFLAGS += -DTHREAD_STACKSIZE_DEFAULT=10000 +CFLAGS += -DTHREAD_STACKSIZE_MAIN=10000 +CFLAGS += -DISR_STACKSIZE=10000 + +# Configure CoAP retransmission timeout +CFLAGS += -DCONFIG_GCOAP_NO_RETRANS_BACKOFF=1 +CFLAGS += -DCONFIG_COAP_ACK_TIMEOUT_MS=400 +CFLAGS += -DCONFIG_COAP_MAX_RETRANSMIT=4 + +# Check if ONLY_PRINT_IP is defined +# If ONLY_PRINT_IP is defined the REMOTE_ADDRESS is not needed +ifdef ONLY_PRINT_IP + # ONLY_PRINT_IP is defined => Set CFLAGS for it + CFLAGS += -DONLY_PRINT_IP=$(ONLY_PRINT_IP) +else ifdef REMOTE_ADDRESS + # REMOTE_ADDRESS is defined => Set CFLAGS for it + CFLAGS += -DREMOTE_ADDRESS=\"$(REMOTE_ADDRESS)\" +else + # Neither is defined + $(error Either define REMOTE_ADDRESS or set ONLY_PRINT_IP=1 to print the IP-Address of this device) +endif + +include $(CURDIR)/../../../../make/riot/riot.mk diff --git a/examples/riot/coap_federated/sender/main.c b/examples/riot/coap_federated/sender/main.c new file mode 100755 index 00000000..9a0e47b3 --- /dev/null +++ b/examples/riot/coap_federated/sender/main.c @@ -0,0 +1,127 @@ +#include "reactor-uc/reactor-uc.h" +#include "reactor-uc/platform/riot/coap_udp_ip_channel.h" + +#ifndef REMOTE_ADDRESS +#define REMOTE_ADDRESS "fe80::8cc3:33ff:febb:1b3" +#endif + +#define REMOTE_PROTOCOL_FAMILY AF_INET6 + +typedef struct { + int size; + char msg[512]; +} lf_msg_t; + +size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + (void)user_struct_size; + const lf_msg_t *msg = user_struct; + + memcpy(msg_buf, &msg->size, sizeof(msg->size)); + memcpy(msg_buf + sizeof(msg->size), msg->msg, msg->size); + + return sizeof(msg->size) + msg->size; +} + +LF_DEFINE_TIMER_STRUCT(Sender, t, 1, 0) +LF_DEFINE_TIMER_CTOR(Sender, t, 1, 0) +LF_DEFINE_REACTION_STRUCT(Sender, r, 1) +LF_DEFINE_REACTION_CTOR(Sender, r, 0) +LF_DEFINE_OUTPUT_STRUCT(Sender, out, 1, lf_msg_t) +LF_DEFINE_OUTPUT_CTOR(Sender, out, 1) + +typedef struct { + Reactor super; + LF_TIMER_INSTANCE(Sender, t); + LF_REACTION_INSTANCE(Sender, r); + LF_PORT_INSTANCE(Sender, out, 1); + LF_REACTOR_BOOKKEEPING_INSTANCES(1, 2, 0); +} Sender; + +LF_DEFINE_REACTION_BODY(Sender, r) { + LF_SCOPE_SELF(Sender); + LF_SCOPE_ENV(); + LF_SCOPE_PORT(Sender, out); + + printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env)); + lf_msg_t val; + strcpy(val.msg, "Hello From Sender"); + val.size = sizeof("Hello From Sender"); + lf_set(out, val); +} + +LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Sender, OutputExternalCtorArgs *out_external) { + LF_REACTOR_CTOR_PREAMBLE(); + LF_REACTOR_CTOR(Sender); + LF_INITIALIZE_REACTION(Sender, r); + LF_INITIALIZE_TIMER(Sender, t, MSEC(0), SEC(1)); + LF_INITIALIZE_OUTPUT(Sender, out, 1, out_external); + + LF_TIMER_REGISTER_EFFECT(self->t, self->r); + LF_PORT_REGISTER_SOURCE(self->out, self->r, 1); +} + +LF_DEFINE_FEDERATED_OUTPUT_CONNECTION(Sender, out, lf_msg_t, 1) + +typedef struct { + FederatedConnectionBundle super; + CoapUdpIpChannel channel; + LF_FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out); + LF_FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1); +} LF_FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver); + +LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) { + LF_FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE(); + CoapUdpIpChannel_ctor(&self->channel, parent->env, REMOTE_ADDRESS, REMOTE_PROTOCOL_FAMILY); + LF_FEDERATED_CONNECTION_BUNDLE_CALL_CTOR(); + LF_INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t); +} + +// Reactor main +typedef struct { + Reactor super; + LF_CHILD_REACTOR_INSTANCE(Sender, sender, 1); + LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver); + LF_FEDERATE_BOOKKEEPING_INSTANCES(1); + LF_CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1); + LF_CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0); + LF_CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0); +} MainSender; + +LF_REACTOR_CTOR_SIGNATURE(MainSender) { + LF_REACTOR_CTOR(MainSender); + LF_FEDERATE_CTOR_PREAMBLE(); + LF_DEFINE_CHILD_OUTPUT_ARGS(sender, out, 1, 1); + LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]); + LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver); + LF_BUNDLE_REGISTER_UPSTREAM(Sender, Receiver, sender, out); +} + +LF_ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1, true) + +void print_ip_addresses(void) { + gnrc_netif_t *netif = gnrc_netif_iter(NULL); + char addr_str[IPV6_ADDR_MAX_STR_LEN]; + + while (netif) { + size_t max_addr_count = 4; + ipv6_addr_t addrs[max_addr_count]; + gnrc_netif_ipv6_addrs_get(netif, addrs, max_addr_count * sizeof(ipv6_addr_t)); + + for (size_t i = 0; i < 2; i++) { + if (ipv6_addr_to_str(addr_str, &addrs[i], sizeof(addr_str))) { + LF_INFO(NET, "IPv6 address: %s", addr_str); + } + } + + netif = gnrc_netif_iter(netif); + } +} + +int main() { +#ifdef ONLY_PRINT_IP + print_ip_addresses(); +#else + lf_start(); +#endif + return 0; +} diff --git a/include/reactor-uc/macros.h b/include/reactor-uc/macros.h index 73282a67..f2697079 100644 --- a/include/reactor-uc/macros.h +++ b/include/reactor-uc/macros.h @@ -677,12 +677,10 @@ typedef struct FederatedInputConnection FederatedInputConnection; env.scheduler->leader = IsLeader; \ env.has_async_events = HasInputs; \ \ - env.enter_critical_section(&env); \ FederateName##_ctor(&main_reactor, NULL, &env); \ env.net_bundles_size = NumBundles; \ env.net_bundles = (FederatedConnectionBundle **)&main_reactor._bundles; \ env.assemble(&env); \ - env.leave_critical_section(&env); \ env.start(&env); \ lf_exit(); \ } diff --git a/include/reactor-uc/platform/patmos/patmos.h b/include/reactor-uc/platform/patmos/patmos.h new file mode 100644 index 00000000..dc7ece09 --- /dev/null +++ b/include/reactor-uc/platform/patmos/patmos.h @@ -0,0 +1,13 @@ +#ifndef REACTOR_UC_PLATFORM_PATMOS_H +#define REACTOR_UC_PLATFORM_PATMOS_H + +#include "reactor-uc/platform.h" +#include "stdbool.h" + +typedef struct { + Platform super; + bool async_event; +} PlatformPatmos; + +void PlatformPatmos_ctor(Platform *self); +#endif diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index 43a58bc2..d9ecb3a8 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -26,7 +26,7 @@ struct TcpIpChannel { int fd; int client; int send_failed_event_fds; // These file descriptors are used to signal the recv select to stop blocking - int terminate_event_fds; + int terminate_event_fds[2]; NetworkChannelState state; pthread_mutex_t mutex; diff --git a/make/riot/riot-lfc.mk b/make/riot/riot-lfc.mk index dacdef5c..a2591fba 100644 --- a/make/riot/riot-lfc.mk +++ b/make/riot/riot-lfc.mk @@ -11,16 +11,23 @@ APPLICATION ?= $(LF_MAIN) # Path of generated lf c-code LF_SRC_GEN_PATH ?= $(CURDIR)/src-gen/$(LF_MAIN) -# Include the Makefile of the generated target application -include $(LF_SRC_GEN_PATH)/Makefile - -# Include generated c files -SRC += $(patsubst %, $(LF_SRC_GEN_PATH)/%, $(LFC_GEN_SOURCES)) - -# Include generated main file -SRC += $(LF_SRC_GEN_PATH)/${LFC_GEN_MAIN} - -# Include generated h files -CFLAGS += -I$(LF_SRC_GEN_PATH) +# Only include generated files if build target is not "clean" +# In this case the src-gen folder was deleted +ifeq ($(MAKECMDGOALS),clean) + # Delete src-gen folder if build target is "clean" + _ := $(shell rm -rf $(LF_SRC_GEN_PATH)) +else + # Include the Makefile of the generated target application + include $(LF_SRC_GEN_PATH)/Makefile + + # Include generated c files + SRC += $(patsubst %, $(LF_SRC_GEN_PATH)/%, $(LFC_GEN_SOURCES)) + + # Include generated main file + SRC += $(LF_SRC_GEN_PATH)/${LFC_GEN_MAIN} + + # Include generated h files + CFLAGS += -I$(LF_SRC_GEN_PATH) +endif include $(RIOT_MK_DIR)/riot.mk diff --git a/src/federated.c b/src/federated.c index ddbb3215..9452a185 100644 --- a/src/federated.c +++ b/src/federated.c @@ -327,4 +327,4 @@ void FederatedConnectionBundle_validate(FederatedConnectionBundle *bundle) { validate(bundle->serialize_hooks[i]); validate(bundle->outputs[i]->super.super.parent); } -} \ No newline at end of file +} diff --git a/src/network_channel.c b/src/network_channel.c index dc9c91cd..304db2c3 100644 --- a/src/network_channel.c +++ b/src/network_channel.c @@ -28,6 +28,10 @@ #error "NETWORK_POSIC_TCP not supported on FlexPRET" #endif +#elif defined(PLATFORM_PATMOS) +#ifdef NETWORK_CHANNEL_TCP_POSIX +#error "NETWORK_POSIX_TCP not supported on Patmos" +#endif #else #error "Platform not supported" #endif diff --git a/src/platform.c b/src/platform.c index b81736cb..b5b506a8 100644 --- a/src/platform.c +++ b/src/platform.c @@ -1,3 +1,4 @@ + #if defined(PLATFORM_POSIX) #include "platform/posix/posix.c" #elif defined(PLATFORM_RIOT) @@ -8,6 +9,8 @@ #include "platform/flexpret/flexpret.c" #elif defined(PLATFORM_PICO) #include "platform/pico/pico.c" +#elif defined(PLATFORM_PATMOS) +#include "platform/patmos/patmos.c" #else #error "NO PLATFORM SPECIFIED" #endif diff --git a/src/platform/patmos/patmos.c b/src/platform/patmos/patmos.c new file mode 100644 index 00000000..7ca6bde0 --- /dev/null +++ b/src/platform/patmos/patmos.c @@ -0,0 +1,109 @@ +#include "reactor-uc/platform/patmos/patmos.h" +#include +#include +#include + +#include +#include + +static PlatformPatmos platform; + +void Platform_vprintf(const char *fmt, va_list args) { vprintf(fmt, args); } + +lf_ret_t PlatformPatmos_initialize(Platform *self) { + (void)self; + return LF_OK; +} + +instant_t PlatformPatmos_get_physical_time(Platform *self) { + (void)self; + return USEC(get_cpu_usecs()); +} + +lf_ret_t PlatformPatmos_wait_until_interruptible(Platform *untyped_self, instant_t wakeup_time) { + PlatformPatmos *self = (PlatformPatmos *)untyped_self; + self->async_event = false; + untyped_self->leave_critical_section(untyped_self); // turing on interrupts + + instant_t now = untyped_self->get_physical_time(untyped_self); + + // Do busy sleep + do { + now = untyped_self->get_physical_time(untyped_self); + } while ((now < wakeup_time) && !self->async_event); + + untyped_self->enter_critical_section(untyped_self); + + if (self->async_event) { + self->async_event = false; + return LF_ERR; + } else { + return LF_OK; + } + + interval_t sleep_duration = wakeup_time - untyped_self->get_physical_time(untyped_self); + if (sleep_duration < 0) { + return LF_OK; + } + + untyped_self->leave_critical_section(untyped_self); + + return LF_OK; +} + +lf_ret_t PlatformPatmos_wait_until(Platform *untyped_self, instant_t wakeup_time) { + interval_t sleep_duration = wakeup_time - untyped_self->get_physical_time(untyped_self); + if (sleep_duration < 0) { + return LF_OK; + } + + instant_t now = untyped_self->get_physical_time(untyped_self); + + // Do busy sleep + do { + now = untyped_self->get_physical_time(untyped_self); + } while (now < wakeup_time); + return LF_OK; +} + +lf_ret_t PlatformPatmos_wait_for(Platform *self, interval_t duration) { + (void)self; + if (duration <= 0) { + return LF_OK; + } + + instant_t now = self->get_physical_time(self); + instant_t wakeup = now + duration; + + // Do busy sleep + do { + now = self->get_physical_time(self); + } while ((now < wakeup)); + + return LF_OK; +} + +void PlatformPatmos_leave_critical_section(Platform *self) { + (void)self; + intr_enable(); +} + +void PlatformPatmos_enter_critical_section(Platform *self) { + (void)self; + intr_disable(); +} + +void PlatformPatmos_new_async_event(Platform *self) { ((PlatformPatmos *)self)->async_event = true; } + +void Platform_ctor(Platform *self) { + self->initialize = PlatformPatmos_initialize; + self->enter_critical_section = PlatformPatmos_enter_critical_section; + self->leave_critical_section = PlatformPatmos_leave_critical_section; + self->get_physical_time = PlatformPatmos_get_physical_time; + self->wait_until = PlatformPatmos_wait_until; + self->wait_for = PlatformPatmos_wait_for; + self->wait_until_interruptible = PlatformPatmos_wait_until_interruptible; + self->new_async_event = PlatformPatmos_new_async_event; +} + +Platform *Platform_new(void) { return (Platform *)&platform; } diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 9d3ddfa4..2a3cb338 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -101,9 +101,8 @@ static lf_ret_t _TcpIpChannel_reset_socket(TcpIpChannel *self) { return LF_ERR; } - self->send_failed_event_fds = eventfd(0, EFD_NONBLOCK); - if (self->send_failed_event_fds == -1) { - TCP_IP_CHANNEL_ERR("Failed to initialize event file descriptor"); + if (socketpair(AF_UNIX, SOCK_STREAM, 0, self->send_failed_event_fds) < 0) { + TCP_IP_CHANNEL_ERR("Failed to initialize \"send_failed\" socketpair file descriptors"); return LF_ERR; } @@ -279,7 +278,7 @@ static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const F switch (errno) { case ETIMEDOUT: case ENOTCONN: - bytes_written = eventfd_write(self->send_failed_event_fds, 1); + ssize_t bytes_written = write(self->send_failed_event_fds[1], "X", 1); if (bytes_written == -1) { TCP_IP_CHANNEL_ERR("Failed informing worker thread, that send_blocking failed, errno=%d", errno); } @@ -443,13 +442,13 @@ static void *_TcpIpChannel_worker_thread(void *untyped_self) { // Set up the file descriptor set FD_ZERO(&readfds); FD_SET(socket, &readfds); - FD_SET(self->send_failed_event_fds, &readfds); + FD_SET(self->send_failed_event_fds[0], &readfds); FD_SET(self->terminate_event_fds, &readfds); // Determine the maximum file descriptor for select max_fd = socket; - if (self->send_failed_event_fds > max_fd) - max_fd = self->send_failed_event_fds; + if (self->send_failed_event_fds[0] > max_fd) + max_fd = self->send_failed_event_fds[0]; if (self->terminate_event_fds > max_fd) max_fd = self->terminate_event_fds; @@ -470,7 +469,7 @@ static void *_TcpIpChannel_worker_thread(void *untyped_self) { } else if (ret == LF_ERR) { /* Return to see what the error was by inspecting the network channel state.*/ } - } else if (FD_ISSET(self->send_failed_event_fds, &readfds)) { + } else if (FD_ISSET(self->send_failed_event_fds[0], &readfds)) { TCP_IP_CHANNEL_DEBUG("Select -> cancelled by send_block failure"); _TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_LOST_CONNECTION); } else if (FD_ISSET(self->terminate_event_fds, &readfds)) { @@ -569,7 +568,6 @@ void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, u self->client = 0; self->fd = 0; self->state = NETWORK_CHANNEL_STATE_UNINITIALIZED; - self->send_failed_event_fds = 0; self->super.is_connected = TcpIpChannel_is_connected; self->super.open_connection = TcpIpChannel_open_connection; diff --git a/src/platform/riot/coap_udp_ip_channel.c b/src/platform/riot/coap_udp_ip_channel.c index 5d7dc68b..758cab37 100644 --- a/src/platform/riot/coap_udp_ip_channel.c +++ b/src/platform/riot/coap_udp_ip_channel.c @@ -5,6 +5,7 @@ #include "net/gcoap.h" #include "net/sock/util.h" +#include "thread.h" #include #define COAP_UDP_IP_CHANNEL_ERR(fmt, ...) LF_ERR(NET, "CoapUdpIpChannel: " fmt, ##__VA_ARGS__) @@ -12,12 +13,11 @@ #define COAP_UDP_IP_CHANNEL_INFO(fmt, ...) LF_INFO(NET, "CoapUdpIpChannel: " fmt, ##__VA_ARGS__) #define COAP_UDP_IP_CHANNEL_DEBUG(fmt, ...) LF_DEBUG(NET, "CoapUdpIpChannel: " fmt, ##__VA_ARGS__) +char _connection_thread_stack[THREAD_STACKSIZE_MAIN]; +int _connection_thread_pid = 0; static bool _is_globals_initialized = false; static Environment *_env; -// Forward declarations -static lf_ret_t _CoapUdpIpChannel_client_send_connect_message(CoapUdpIpChannel *self); - static void _CoapUdpIpChannel_update_state(CoapUdpIpChannel *self, NetworkChannelState new_state) { COAP_UDP_IP_CHANNEL_DEBUG("Update state: %s => %s", NetworkChannel_state_to_string(self->state), NetworkChannel_state_to_string(new_state)); @@ -26,13 +26,22 @@ static void _CoapUdpIpChannel_update_state(CoapUdpIpChannel *self, NetworkChanne NetworkChannelState old_state = self->state; // Update the state of the channel to its new state + mutex_lock(&self->state_mutex); self->state = new_state; + mutex_unlock(&self->state_mutex); // Inform runtime about new state if it changed from or to NETWORK_CHANNEL_STATE_CONNECTED if ((old_state == NETWORK_CHANNEL_STATE_CONNECTED && new_state != NETWORK_CHANNEL_STATE_CONNECTED) || (old_state != NETWORK_CHANNEL_STATE_CONNECTED && new_state == NETWORK_CHANNEL_STATE_CONNECTED)) { _env->platform->new_async_event(_env->platform); } + + // Let connection thread evaluate new state of this channel + msg_t msg = { + .type = 0, + .content.ptr = self, + }; + msg_try_send(&msg, _connection_thread_pid); } static void _CoapUdpIpChannel_update_state_if_not(CoapUdpIpChannel *self, NetworkChannelState new_state, @@ -40,7 +49,8 @@ static void _CoapUdpIpChannel_update_state_if_not(CoapUdpIpChannel *self, Networ // Update the state of the channel itself mutex_lock(&self->state_mutex); if (self->state != if_not) { - COAP_UDP_IP_CHANNEL_DEBUG("Update state: %d => %d", self->state, new_state); + COAP_UDP_IP_CHANNEL_DEBUG("Update state: %s => %s", NetworkChannel_state_to_string(self->state), + NetworkChannel_state_to_string(new_state)); self->state = new_state; } mutex_unlock(&self->state_mutex); @@ -71,8 +81,10 @@ static CoapUdpIpChannel *_CoapUdpIpChannel_get_coap_channel_by_remote(const sock } } - COAP_UDP_IP_CHANNEL_ERR("Channel not found by socket"); + char remote_addr_str[IPV6_ADDR_MAX_STR_LEN]; + sock_udp_ep_fmt(remote, remote_addr_str, NULL); + COAP_UDP_IP_CHANNEL_ERR("Channel not found by socket (addr=%s)", remote_addr_str); return NULL; } @@ -84,12 +96,13 @@ static bool _CoapUdpIpChannel_send_coap_message(sock_udp_ep_t *remote, char *pat coap_hdr_set_type(pdu.hdr, COAP_TYPE_CON); ssize_t bytes_sent = gcoap_req_send(buf, len, remote, NULL, resp_handler, NULL, GCOAP_SOCKET_TYPE_UDP); - COAP_UDP_IP_CHANNEL_DEBUG("Sending %d bytes", bytes_sent); if (bytes_sent > 0) { - COAP_UDP_IP_CHANNEL_DEBUG("Message sent"); + COAP_UDP_IP_CHANNEL_DEBUG("Sending %d bytes", bytes_sent); + COAP_UDP_IP_CHANNEL_DEBUG("CoAP Message sent"); return true; } + COAP_UDP_IP_CHANNEL_ERR("Failed to send CoAP message"); return false; } @@ -123,10 +136,11 @@ static bool _CoapUdpIpChannel_send_coap_message_with_payload(CoapUdpIpChannel *s ssize_t bytes_sent = gcoap_req_send(self->write_buffer, len, remote, NULL, resp_handler, NULL, GCOAP_SOCKET_TYPE_UDP); COAP_UDP_IP_CHANNEL_DEBUG("Sending %d bytes", bytes_sent); if (bytes_sent > 0) { - COAP_UDP_IP_CHANNEL_DEBUG("Message sent"); + COAP_UDP_IP_CHANNEL_DEBUG("CoAP Message sent"); return true; } + COAP_UDP_IP_CHANNEL_ERR("Failed to send CoAP message"); return false; } @@ -217,11 +231,11 @@ static void _CoapUdpIpChannel_client_open_connection_callback(const gcoap_reques if (memo->state == GCOAP_MEMO_TIMEOUT) { // Failure COAP_UDP_IP_CHANNEL_ERR("TIMEOUT => Try to connect again"); - _CoapUdpIpChannel_client_send_connect_message(self); // Try to connect again + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); } else if (coap_get_code_class(pdu) != COAP_CLASS_SUCCESS) { // Failure COAP_UDP_IP_CHANNEL_ERR("CONNECTION REJECTED => Try to connect again"); - _CoapUdpIpChannel_client_send_connect_message(self); // Try to connect again + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); } else { // Success _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED); @@ -254,7 +268,8 @@ static lf_ret_t CoapUdpIpChannel_open_connection(NetworkChannel *untyped_self) { // the connection to us as established. /* Client */ - return _CoapUdpIpChannel_client_send_connect_message(self); + _CoapUdpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_OPEN); + return LF_OK; } static void _CoapUdpIpChannel_client_close_connection_callback(const gcoap_request_memo_t *memo, coap_pkt_t *pdu, @@ -312,9 +327,6 @@ static lf_ret_t CoapUdpIpChannel_send_blocking(NetworkChannel *untyped_self, con if (_CoapUdpIpChannel_get_state(self) == NETWORK_CHANNEL_STATE_CONNECTED) { return LF_OK; - } else { - // Try to connect again - _CoapUdpIpChannel_client_send_connect_message(self); } } @@ -345,6 +357,44 @@ static bool CoapUdpIpChannel_is_connected(NetworkChannel *untyped_self) { return _CoapUdpIpChannel_get_state(self) == NETWORK_CHANNEL_STATE_CONNECTED; } +void *_CoapUdpIpChannel_connection_thread(void *arg) { + COAP_UDP_IP_CHANNEL_DEBUG("Start connection thread"); + (void)arg; + msg_t m; + + while (true) { + msg_receive(&m); + + CoapUdpIpChannel *self = m.content.ptr; + + switch (self->state) { + case NETWORK_CHANNEL_STATE_OPEN: { + /* try to connect */ + _CoapUdpIpChannel_client_send_connect_message(self); + } break; + + case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS: + /* nothing to do */ + break; + + case NETWORK_CHANNEL_STATE_LOST_CONNECTION: + case NETWORK_CHANNEL_STATE_CONNECTION_FAILED: { + /* try to reconnect */ + _CoapUdpIpChannel_client_send_connect_message(self); + } break; + + case NETWORK_CHANNEL_STATE_CONNECTED: + break; + + case NETWORK_CHANNEL_STATE_UNINITIALIZED: + case NETWORK_CHANNEL_STATE_CLOSED: + break; + } + } + + return NULL; +} + void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, Environment *env, const char *remote_address, int remote_protocol_family) { assert(self != NULL); @@ -360,6 +410,11 @@ void CoapUdpIpChannel_ctor(CoapUdpIpChannel *self, Environment *env, const char // Initialize coap server gcoap_register_listener(&_listener); + + // Create connection thread + _connection_thread_pid = + thread_create(_connection_thread_stack, sizeof(_connection_thread_stack), THREAD_PRIORITY_MAIN - 1, 0, + _CoapUdpIpChannel_connection_thread, NULL, "coap_connection_thread"); } // Super fields diff --git a/src/platform/riot/riot.c b/src/platform/riot/riot.c index 8b31c71a..ca7d6956 100644 --- a/src/platform/riot/riot.c +++ b/src/platform/riot/riot.c @@ -64,12 +64,12 @@ lf_ret_t PlatformRiot_wait_for(Platform *self, interval_t duration) { void PlatformRiot_leave_critical_section(Platform *self) { PlatformRiot *p = (PlatformRiot *)self; - p->irq_mask = irq_disable(); + irq_restore(p->irq_mask); } void PlatformRiot_enter_critical_section(Platform *self) { PlatformRiot *p = (PlatformRiot *)self; - irq_restore(p->irq_mask); + p->irq_mask = irq_disable(); } void PlatformRiot_new_async_event(Platform *self) { mutex_unlock(&((PlatformRiot *)self)->lock); } diff --git a/src/scheduler.c b/src/scheduler.c index 8cec05c3..96553c75 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -1,5 +1,4 @@ - #if defined(SCHEDULER_DYNAMIC) #include "./schedulers/dynamic/scheduler.c" #elif defined(SCHEDULER_STATIC) diff --git a/test/unit/tcp_channel_test.c b/test/unit/tcp_channel_test.c index 98cb573e..dea324c7 100644 --- a/test/unit/tcp_channel_test.c +++ b/test/unit/tcp_channel_test.c @@ -161,7 +161,7 @@ void test_socket_reset(void) { TEST_ASSERT_TRUE(client_channel->is_connected(client_channel)); // reset the client socket - ssize_t bytes_written = eventfd_write(_client_tcp_channel.send_failed_event_fds, 1); + ssize_t bytes_written = write(_client_tcp_channel.send_failed_event_fds[1], "X", 1); if (bytes_written == -1) { LF_ERR(NET, "Failed informing worker thread, that send_blocking failed errno=%d", errno); } else {