diff --git a/.github/actions/memory-report/action.yml b/.github/actions/memory-report/action.yml index b313b60e..0c2d7261 100644 --- a/.github/actions/memory-report/action.yml +++ b/.github/actions/memory-report/action.yml @@ -24,7 +24,7 @@ runs: # For the multi line output # https://stackoverflow.com/questions/74137120/how-to-fix-or-avoid-error-unable-to-process-file-command-output-successfully run: | - make test + make unit-test EOF=$(dd if=/dev/urandom bs=15 count=1 status=none | base64) echo "report<<$EOF" >> $GITHUB_OUTPUT echo "$(cat build/test/unit/*.size)" >> $GITHUB_OUTPUT diff --git a/Makefile b/Makefile index b3e9453c..1843a9f9 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ -.PHONY: clean test coverage asan format format-check ci lf-test lib proto +.PHONY: clean test coverage asan format format-check ci lf-test lib proto examples -test: unit-test lf-test +test: unit-test lf-test examples # Generate protobuf code @@ -14,7 +14,7 @@ lib: make -C build -# Build and run examples +# Build examples examples: cmake -Bbuild -DBUILD_EXAMPLES=ON . cmake --build build @@ -29,8 +29,7 @@ unit-test: # Build and run lf tests lf-test: - @echo "Skipping LF tests" -#make -C test/lf + make -C test/lf # Get coverage data on unit tests coverage: diff --git a/examples/posix/CMakeLists.txt b/examples/posix/CMakeLists.txt index 33ffae80..72f8b024 100644 --- a/examples/posix/CMakeLists.txt +++ b/examples/posix/CMakeLists.txt @@ -29,9 +29,3 @@ add_custom_target(examples ) # Add each executable to the custom target -foreach(EXEC_NAME ${EXECUTABLES}) - add_custom_command(TARGET examples - COMMAND ${EXEC_NAME} - COMMENT "Running ${EXEC_NAME}" - ) -endforeach() \ No newline at end of file diff --git a/examples/posix/testing_fed_conn.c b/examples/posix/testing_fed_conn.c index 97ee6cd3..24c0234a 100644 --- a/examples/posix/testing_fed_conn.c +++ b/examples/posix/testing_fed_conn.c @@ -120,8 +120,8 @@ typedef struct { msg_t buffer[1]; } ConnSender; -void ConnSender_ctor(ConnSender *self, Reactor *parent, FederatedConnectionBundle *bundle, Port *upstream) { - FederatedOutputConnection_ctor(&self->super, parent, bundle, 0, upstream, &self->buffer[0], sizeof(self->buffer[0])); +void ConnSender_ctor(ConnSender *self, Reactor *parent, FederatedConnectionBundle *bundle) { + FederatedOutputConnection_ctor(&self->super, parent, bundle, 0, &self->buffer[0], sizeof(self->buffer[0])); } typedef struct { @@ -133,20 +133,21 @@ typedef struct { void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET); - ConnSender_ctor(&self->conn, &parent->super, &self->super, &parent->out.super.super); + ConnSender_ctor(&self->conn, &parent->super, &self->super); self->output[0] = &self->conn.super; + CONN_REGISTER_UPSTREAM(self->conn, parent->out); - TcpIpChannel *channel = &self->channel; - int ret = channel->super.bind(channel); + NetworkChannel *channel = (NetworkChannel *)&self->channel; + int ret = channel->bind(channel); validate(ret == LF_OK); printf("Sender: Bound\n"); // accept one connection - bool new_connection = channel->super.accept(channel); + bool new_connection = channel->accept(channel); validate(new_connection); printf("Sender: Accepted\n"); - FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel, NULL, 0, + FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, 0, (FederatedOutputConnection **)&self->output, 1); } @@ -173,17 +174,17 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET); self->inputs[0] = &self->conn.super; - TcpIpChannel *channel = &self->channel; + NetworkChannel *channel = (NetworkChannel *)&self->channel; lf_ret_t ret; do { - ret = channel->super.connect(channel); + ret = channel->connect(channel); } while (ret != LF_OK); validate(ret == LF_OK); printf("Recv: Connected\n"); - FederatedConnectionBundle_ctor(&self->super, parent, &self->channel, (FederatedInputConnection **)&self->inputs, 1, - NULL, 0); + FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs, + 1, NULL, 0); } // Reactor main @@ -235,7 +236,7 @@ void *main_sender(void *unused) { MainSender_ctor(&sender, &env_send); env_send.set_timeout(&env_send, SEC(1)); env_send.net_channel_size = 1; - env_send.net_channels = (TcpIpChannel **)&sender.net_channel; + env_send.net_channels = (NetworkChannel **)&sender.net_channel; env_send.assemble(&env_send); env_send.start(&env_send); return NULL; @@ -252,7 +253,7 @@ void *main_recv(void *unused) { env_recv.keep_alive = true; env_recv.has_async_events = true; env_recv.net_channel_size = 1; - env_recv.net_channels = (TcpIpChannel **)&receiver.net_channels; + env_recv.net_channels = (NetworkChannel **)&receiver.net_channels; env_recv.assemble(&env_recv); env_recv.platform->leave_critical_section(env_recv.platform); env_recv.start(&env_recv); diff --git a/examples/posix/testing_posix_tcp_ip_channel_client.c b/examples/posix/testing_posix_tcp_ip_channel_client.c index eb030e9e..7ea20bce 100644 --- a/examples/posix/testing_posix_tcp_ip_channel_client.c +++ b/examples/posix/testing_posix_tcp_ip_channel_client.c @@ -24,19 +24,19 @@ int main() { TcpIpChannel_ctor(&channel, host, port, AF_INET); // binding to that address - channel.super.connect(&channel); + channel.super.connect(&channel.super); // change the super to non-blocking - channel.super.change_block_state(&channel, false); + channel.super.change_block_state(&channel.super, false); for (int i = 0; i < NUM_ITER; i++) { // sending message - channel.super.send(&channel, &port_message); + channel.super.send(&channel.super, &port_message); // waiting for reply TaggedMessage *received_message = NULL; do { - received_message = channel.super.receive(&channel); + received_message = channel.super.receive(&channel.super); } while (received_message == NULL); printf("Received message with connection number %i and content %s\n", received_message->conn_id, @@ -45,5 +45,5 @@ int main() { sleep(i); } - channel.super.close(&channel); + channel.super.close(&channel.super); } diff --git a/examples/posix/testing_posix_tcp_ip_channel_server.c b/examples/posix/testing_posix_tcp_ip_channel_server.c index 518359ea..9c1c8516 100644 --- a/examples/posix/testing_posix_tcp_ip_channel_server.c +++ b/examples/posix/testing_posix_tcp_ip_channel_server.c @@ -13,28 +13,28 @@ int main() { TcpIpChannel_ctor(&channel, host, port, AF_INET); // binding to that address - channel.super.bind(&channel); + channel.super.bind(&channel.super); // change the super to non-blocking - channel.super.change_block_state(&channel, false); + channel.super.change_block_state(&channel.super, false); // accept one connection bool new_connection; do { - new_connection = channel.super.accept(&channel); + new_connection = channel.super.accept(&channel.super); } while (!new_connection); // waiting for messages from client TaggedMessage *message = NULL; do { - message = channel.super.receive(&channel); + message = channel.super.receive(&channel.super); sleep(1); } while (message == NULL); printf("Received message with connection number %i and content %s\n", message->conn_id, (char *)message->payload.bytes); - channel.super.send(&channel, message); + channel.super.send(&channel.super, message); - channel.super.close(&channel); + channel.super.close(&channel.super); } diff --git a/examples/posix/testing_tcp_ip_channel_server_callback.c b/examples/posix/testing_tcp_ip_channel_server_callback.c index 02f5c62e..9f6306c0 100644 --- a/examples/posix/testing_tcp_ip_channel_server_callback.c +++ b/examples/posix/testing_tcp_ip_channel_server_callback.c @@ -7,7 +7,7 @@ TcpIpChannel channel; void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); - channel.super.send(&channel, msg); + channel.super.send(&channel.super, msg); } int main() { @@ -19,20 +19,20 @@ int main() { TcpIpChannel_ctor(&channel, host, port, AF_INET); // binding to that address - channel.super.bind(&channel); + channel.super.bind(&channel.super); // change the super to non-blocking - channel.super.change_block_state(&channel, false); + channel.super.change_block_state(&channel.super, false); // accept one connection bool new_connection; do { - new_connection = channel.super.accept(&channel); + new_connection = channel.super.accept(&channel.super); } while (!new_connection); - channel.super.register_callback(&channel, callback_handler, NULL); + channel.super.register_callback(&channel.super, callback_handler, NULL); sleep(100); - channel.super.close(&channel); + channel.super.close(&channel.super); } diff --git a/examples/posix/timer_ex.c b/examples/posix/timer_ex.c index 5a5ff003..06357830 100644 --- a/examples/posix/timer_ex.c +++ b/examples/posix/timer_ex.c @@ -2,7 +2,7 @@ typedef struct { Timer super; - Reaction *effects[0]; + Reaction *effects[1]; } MyTimer; typedef struct { diff --git a/include/reactor-uc/connection.h b/include/reactor-uc/connection.h index b2d901d0..d7431ead 100644 --- a/include/reactor-uc/connection.h +++ b/include/reactor-uc/connection.h @@ -44,7 +44,6 @@ struct Connection { * @param self The Connection object to construct * @param type The type of the connection. Either logical, delayed or physical. * @param parent The reactor in which this connection appears (not the reactors of the ports it connects) - * @param upstream The pointer to the upstream port of this connection * @param downstreams A pointer to an array of pointers to downstream ports. * @param num_downstreams The size of the downstreams array. * @param trigger_value A pointer to the TriggerValue that holds the data of the events that are scheduled on this @@ -53,16 +52,15 @@ struct Connection { * @param cleanup The cleanup function that is called at the end of timestep after all reactions have executed. * @param trigger_downstreams The function that triggers all downstreams of this connection. */ -void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams, TriggerValue *trigger_value, void (*prepare)(Trigger *), - void (*cleanup)(Trigger *), void (*trigger_downstreams)(Connection *, const void *, size_t)); +void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams, + TriggerValue *trigger_value, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), + void (*trigger_downstreams)(Connection *, const void *, size_t)); struct LogicalConnection { Connection super; }; -void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams); +void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams); struct DelayedConnection { Connection super; @@ -70,9 +68,8 @@ struct DelayedConnection { TriggerValue trigger_value; }; -void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size, - size_t value_capacity); +void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, + interval_t delay, void *value_buf, size_t value_size, size_t value_capacity); struct PhysicalConnection { Connection super; @@ -80,8 +77,7 @@ struct PhysicalConnection { TriggerValue trigger_value; }; -void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size, - size_t value_capacity); +void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, + interval_t delay, void *value_buf, size_t value_size, size_t value_capacity); #endif diff --git a/include/reactor-uc/federated.h b/include/reactor-uc/federated.h index 76141246..cd4f0699 100644 --- a/include/reactor-uc/federated.h +++ b/include/reactor-uc/federated.h @@ -40,7 +40,7 @@ struct FederatedOutputConnection { }; void FederatedOutputConnection_ctor(FederatedOutputConnection *self, Reactor *parent, FederatedConnectionBundle *bundle, - int conn_id, Port *upstream, void *value_ptr, size_t value_size); + int conn_id, void *value_ptr, size_t value_size); // A single input connection to this federate. Has a single upstream port struct FederatedInputConnection { diff --git a/include/reactor-uc/macros.h b/include/reactor-uc/macros.h index aba74348..c224c200 100644 --- a/include/reactor-uc/macros.h +++ b/include/reactor-uc/macros.h @@ -79,12 +79,17 @@ #define OUTPUT_REGISTER_SOURCE(output, source) TRIGGER_REGISTER_SOURCE((Output *)&(output), (Reaction *)&(source)) // Convenience macro to register a downstream port on a connection. -// TODO: Replace the entire function with an inline macro to save memory #define CONN_REGISTER_DOWNSTREAM(conn, down) \ do { \ ((Connection *)&(conn))->register_downstream((Connection *)&(conn), (Port *)&(down)); \ } while (0) +// Convenience macro to register an upstream port on a connection +#define CONN_REGISTER_UPSTREAM(conn, up) \ + do { \ + ((Connection *)&(conn))->upstream = (Port *)&(up); \ + } while (0) + // TODO: The following macro is defined to avoid compiler warnings. Ideally we would // not have to specify any alignment on any structs. It is a TODO to understand exactly why // the compiler complains and what we can do about it. diff --git a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionGenerator.kt b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionGenerator.kt index 0d3b659d..5cc2e45d 100644 --- a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionGenerator.kt +++ b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionGenerator.kt @@ -121,34 +121,36 @@ class UcConnectionGenerator(private val reactor: Reactor) { fun generateReactorCtorCode(conn: UcConnection) = with(PrependOperator) { """ - |${conn.codeType}_ctor(&self->${conn.codeName}, &self->super, (Port *) &self->${getPortCodeName(conn.src)}); + |${conn.codeType}_ctor(&self->${conn.codeName}, &self->super); ${" | "..generateConnectionStatements(conn)} | """.trimMargin() }; - fun generateConnectionStatements(conn: UcConnection) = conn.getDests().joinToString(separator = "\n") { - "CONN_REGISTER_DOWNSTREAM(self->${conn.codeName}, self->${getPortCodeName(it)});" - } + fun generateConnectionStatements(conn: UcConnection) = + "CONN_REGISTER_UPSTREAM(self->${conn.codeName}, self->${getPortCodeName(conn.src)});\n" + + conn.getDests().joinToString(separator = "\n") { + "CONN_REGISTER_DOWNSTREAM(self->${conn.codeName}, self->${getPortCodeName(it)});"} + fun generateReactorCtorCodes() = getUcConnections().joinToString(prefix = "// Initialize connections\n", separator = "\n", postfix = "\n") { generateReactorCtorCode(it)} fun generateLogicalCtor(conn: UcConnection) = with(PrependOperator) { """ - |static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent, Port *upstream) { - | LogicalConnection_ctor(&self->super, parent, upstream, self->_downstreams, ${conn.getDests().size}); + |static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent) { + | LogicalConnection_ctor(&self->super, parent, self->_downstreams, ${conn.getDests().size}); |} """.trimMargin() } fun generateDelayedCtor(conn: UcConnection) = with(PrependOperator) { """ - |static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent, Port *upstream) { - | DelayedConnection_ctor(&self->super, parent, upstream, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize}); + |static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent) { + | DelayedConnection_ctor(&self->super, parent, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize}); |} """.trimMargin() } fun generatePhysicalCtor(conn: UcConnection) = with(PrependOperator) { """ - |static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent, Port *upstream) { - | PhysicalConnection_ctor(&self->super, parent, upstream, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize}); + |static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent) { + | PhysicalConnection_ctor(&self->super, parent, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize}); |} """.trimMargin() } diff --git a/src/connection.c b/src/connection.c index 5de34eed..c01da25b 100644 --- a/src/connection.c +++ b/src/connection.c @@ -61,14 +61,11 @@ void LogicalConnection_trigger_downstreams(Connection *self, const void *value, } } -void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams, TriggerValue *trigger_value, void (*prepare)(Trigger *), - void (*cleanup)(Trigger *), void (*trigger_downstreams)(Connection *, const void *, size_t)) { - // FIXME: Should not be part of constructor... - if (upstream) { - upstream->conn_out = self; - } - self->upstream = upstream; +void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams, + TriggerValue *trigger_value, void (*prepare)(Trigger *), void (*cleanup)(Trigger *), + void (*trigger_downstreams)(Connection *, const void *, size_t)) { + + self->upstream = NULL; self->downstreams_size = num_downstreams; self->downstreams_registered = 0; self->downstreams = downstreams; @@ -79,9 +76,8 @@ void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port * Trigger_ctor(&self->super, type, parent, trigger_value, prepare, cleanup, NULL); } -void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams) { - Connection_ctor(&self->super, TRIG_CONN, parent, upstream, downstreams, num_downstreams, NULL, NULL, NULL, +void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams) { + Connection_ctor(&self->super, TRIG_CONN, parent, downstreams, num_downstreams, NULL, NULL, NULL, LogicalConnection_trigger_downstreams); } @@ -152,12 +148,11 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value, sched->register_for_cleanup(sched, &_self->super); } -void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size, - size_t value_capacity) { +void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, + interval_t delay, void *value_buf, size_t value_size, size_t value_capacity) { self->delay = delay; TriggerValue_ctor(&self->trigger_value, value_buf, value_size, value_capacity); - Connection_ctor(&self->super, TRIG_CONN_DELAYED, parent, upstream, downstreams, num_downstreams, &self->trigger_value, + Connection_ctor(&self->super, TRIG_CONN_DELAYED, parent, downstreams, num_downstreams, &self->trigger_value, DelayedConnection_prepare, DelayedConnection_cleanup, DelayedConnection_trigger_downstreams); } @@ -213,12 +208,10 @@ void PhysicalConnection_trigger_downstreams(Connection *_self, const void *value // Possibly handle } -void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port *upstream, Port **downstreams, - size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size, - size_t value_capacity) { +void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, + interval_t delay, void *value_buf, size_t value_size, size_t value_capacity) { TriggerValue_ctor(&self->trigger_value, value_buf, value_size, value_capacity); - Connection_ctor(&self->super, TRIG_CONN_PHYSICAL, parent, upstream, downstreams, num_downstreams, - &self->trigger_value, PhysicalConnection_prepare, PhysicalConnection_cleanup, - PhysicalConnection_trigger_downstreams); + Connection_ctor(&self->super, TRIG_CONN_PHYSICAL, parent, downstreams, num_downstreams, &self->trigger_value, + PhysicalConnection_prepare, PhysicalConnection_cleanup, PhysicalConnection_trigger_downstreams); self->delay = delay; } diff --git a/src/environment.c b/src/environment.c index bf9ebc5b..dca45066 100644 --- a/src/environment.c +++ b/src/environment.c @@ -1,6 +1,5 @@ #include "reactor-uc/environment.h" #include "reactor-uc/logging.h" -#include "reactor-uc/platform/posix/tcp_ip_channel.h" // FIXME: NetworkChannel instead #include "reactor-uc/reactor.h" #include "reactor-uc/scheduler.h" #include diff --git a/src/federated.c b/src/federated.c index f0adc4b9..3c7edd0e 100644 --- a/src/federated.c +++ b/src/federated.c @@ -48,9 +48,9 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { } void FederatedOutputConnection_ctor(FederatedOutputConnection *self, Reactor *parent, FederatedConnectionBundle *bundle, - int conn_id, Port *upstream, void *value_ptr, size_t value_size) { + int conn_id, void *value_ptr, size_t value_size) { - Connection_ctor(&self->super, TRIG_CONN_FEDERATED_OUTPUT, parent, upstream, NULL, 0, NULL, NULL, + Connection_ctor(&self->super, TRIG_CONN_FEDERATED_OUTPUT, parent, NULL, 0, NULL, NULL, FederatedOutputConnection_cleanup, FederatedOutputConnection_trigger_downstream); self->staged = false; self->conn_id = conn_id; @@ -93,8 +93,8 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare Port **downstreams, size_t downstreams_size, void *value_buf, size_t value_size, size_t value_capacity) { TriggerValue_ctor(&self->trigger_value, value_buf, value_size, value_capacity); - Connection_ctor(&self->super, TRIG_CONN_FEDERATED_INPUT, parent, NULL, downstreams, downstreams_size, - &self->trigger_value, FederatedInputConnection_prepare, FederatedInputConnection_cleanup, NULL); + Connection_ctor(&self->super, TRIG_CONN_FEDERATED_INPUT, parent, downstreams, downstreams_size, &self->trigger_value, + FederatedInputConnection_prepare, FederatedInputConnection_cleanup, NULL); self->delay = delay; self->is_physical = is_physical; self->last_known_tag = NEVER_TAG; diff --git a/test/unit/delayed_conn_test.c b/test/unit/delayed_conn_test.c index ab2e88bb..03c1dbb9 100644 --- a/test/unit/delayed_conn_test.c +++ b/test/unit/delayed_conn_test.c @@ -111,8 +111,8 @@ struct Conn1 { Input *downstreams[1]; }; -void Conn1_ctor(struct Conn1 *self, Reactor *parent, Output *upstream) { - DelayedConnection_ctor(&self->super, parent, &upstream->super, (Port **)self->downstreams, 1, MSEC(150), self->buffer, +void Conn1_ctor(struct Conn1 *self, Reactor *parent) { + DelayedConnection_ctor(&self->super, parent, (Port **)self->downstreams, 1, MSEC(150), self->buffer, sizeof(self->buffer[0]), 2); } @@ -133,8 +133,9 @@ void Main_ctor(struct Main *self, Environment *env) { self->_children[1] = &self->receiver.super; Receiver_ctor(&self->receiver, &self->super, env); - Conn1_ctor(&self->conn, &self->super, &self->sender.out.super); - self->conn.super.super.register_downstream(&self->conn.super.super, &self->receiver.inp.super.super); + Conn1_ctor(&self->conn, &self->super); + CONN_REGISTER_UPSTREAM(self->conn, self->sender.out); + CONN_REGISTER_DOWNSTREAM(self->conn, self->receiver.inp); Reactor_ctor(&self->super, "Main", env, NULL, self->_children, 2, NULL, 0, NULL, 0); } diff --git a/test/unit/port_test.c b/test/unit/port_test.c index 96758f3b..a67bb779 100644 --- a/test/unit/port_test.c +++ b/test/unit/port_test.c @@ -106,8 +106,8 @@ struct Conn1 { Input *downstreams[1]; }; -void Conn1_ctor(struct Conn1 *self, Reactor *parent, Output *upstream) { - LogicalConnection_ctor(&self->super, parent, &upstream->super, (Port **)self->downstreams, 1); +void Conn1_ctor(struct Conn1 *self, Reactor *parent) { + LogicalConnection_ctor(&self->super, parent, (Port **)self->downstreams, 1); } // Reactor main @@ -127,8 +127,9 @@ void Main_ctor(struct Main *self, Environment *env) { self->_children[1] = &self->receiver.super; Receiver_ctor(&self->receiver, &self->super, env); - Conn1_ctor(&self->conn, &self->super, &self->sender.out.super); - self->conn.super.super.register_downstream(&self->conn.super.super, &self->receiver.inp.super.super); + Conn1_ctor(&self->conn, &self->super); + CONN_REGISTER_UPSTREAM(self->conn, self->sender.out); + CONN_REGISTER_DOWNSTREAM(self->conn, self->receiver.inp); Reactor_ctor(&self->super, "Main", env, NULL, self->_children, 2, NULL, 0, NULL, 0); }