Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont require upstream port when constructing a Connection #65

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: clean test coverage asan format format-check ci lf-test lib proto
.PHONY: clean test coverage asan format format-check ci lf-test lib proto examples

test: unit-test lf-test
test: unit-test lf-test examples


# Generate protobuf code
Expand All @@ -14,7 +14,7 @@ lib:
make -C build


# Build and run examples
# Build examples
examples:
cmake -Bbuild -DBUILD_EXAMPLES=ON .
cmake --build build
Expand All @@ -29,8 +29,7 @@ unit-test:

# Build and run lf tests
lf-test:
@echo "Skipping LF tests"
#make -C test/lf
make -C test/lf

# Get coverage data on unit tests
coverage:
Expand Down
6 changes: 0 additions & 6 deletions examples/posix/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,3 @@ add_custom_target(examples
)

# Add each executable to the custom target
foreach(EXEC_NAME ${EXECUTABLES})
add_custom_command(TARGET examples
COMMAND ${EXEC_NAME}
COMMENT "Running ${EXEC_NAME}"
)
endforeach()
27 changes: 14 additions & 13 deletions examples/posix/testing_fed_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ typedef struct {
msg_t buffer[1];
} ConnSender;

void ConnSender_ctor(ConnSender *self, Reactor *parent, FederatedConnectionBundle *bundle, Port *upstream) {
FederatedOutputConnection_ctor(&self->super, parent, bundle, 0, upstream, &self->buffer[0], sizeof(self->buffer[0]));
void ConnSender_ctor(ConnSender *self, Reactor *parent, FederatedConnectionBundle *bundle) {
FederatedOutputConnection_ctor(&self->super, parent, bundle, 0, &self->buffer[0], sizeof(self->buffer[0]));
}

typedef struct {
Expand All @@ -133,20 +133,21 @@ typedef struct {

void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET);
ConnSender_ctor(&self->conn, &parent->super, &self->super, &parent->out.super.super);
ConnSender_ctor(&self->conn, &parent->super, &self->super);
self->output[0] = &self->conn.super;
CONN_REGISTER_UPSTREAM(self->conn, parent->out);

TcpIpChannel *channel = &self->channel;
int ret = channel->super.bind(channel);
NetworkChannel *channel = (NetworkChannel *)&self->channel;
int ret = channel->bind(channel);
validate(ret == LF_OK);
printf("Sender: Bound\n");

// accept one connection
bool new_connection = channel->super.accept(channel);
bool new_connection = channel->accept(channel);
validate(new_connection);
printf("Sender: Accepted\n");

FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel, NULL, 0,
FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, 0,
(FederatedOutputConnection **)&self->output, 1);
}

Expand All @@ -173,17 +174,17 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET);
self->inputs[0] = &self->conn.super;

TcpIpChannel *channel = &self->channel;
NetworkChannel *channel = (NetworkChannel *)&self->channel;

lf_ret_t ret;
do {
ret = channel->super.connect(channel);
ret = channel->connect(channel);
} while (ret != LF_OK);
validate(ret == LF_OK);
printf("Recv: Connected\n");

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel, (FederatedInputConnection **)&self->inputs, 1,
NULL, 0);
FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs,
1, NULL, 0);
}

// Reactor main
Expand Down Expand Up @@ -235,7 +236,7 @@ void *main_sender(void *unused) {
MainSender_ctor(&sender, &env_send);
env_send.set_timeout(&env_send, SEC(1));
env_send.net_channel_size = 1;
env_send.net_channels = (TcpIpChannel **)&sender.net_channel;
env_send.net_channels = (NetworkChannel **)&sender.net_channel;
env_send.assemble(&env_send);
env_send.start(&env_send);
return NULL;
Expand All @@ -252,7 +253,7 @@ void *main_recv(void *unused) {
env_recv.keep_alive = true;
env_recv.has_async_events = true;
env_recv.net_channel_size = 1;
env_recv.net_channels = (TcpIpChannel **)&receiver.net_channels;
env_recv.net_channels = (NetworkChannel **)&receiver.net_channels;
env_recv.assemble(&env_recv);
env_recv.platform->leave_critical_section(env_recv.platform);
env_recv.start(&env_recv);
Expand Down
10 changes: 5 additions & 5 deletions examples/posix/testing_posix_tcp_ip_channel_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ int main() {
TcpIpChannel_ctor(&channel, host, port, AF_INET);

// binding to that address
channel.super.connect(&channel);
channel.super.connect(&channel.super);

// change the super to non-blocking
channel.super.change_block_state(&channel, false);
channel.super.change_block_state(&channel.super, false);

for (int i = 0; i < NUM_ITER; i++) {
// sending message
channel.super.send(&channel, &port_message);
channel.super.send(&channel.super, &port_message);

// waiting for reply
TaggedMessage *received_message = NULL;
do {
received_message = channel.super.receive(&channel);
received_message = channel.super.receive(&channel.super);
} while (received_message == NULL);

printf("Received message with connection number %i and content %s\n", received_message->conn_id,
Expand All @@ -45,5 +45,5 @@ int main() {
sleep(i);
}

channel.super.close(&channel);
channel.super.close(&channel.super);
}
12 changes: 6 additions & 6 deletions examples/posix/testing_posix_tcp_ip_channel_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@ int main() {
TcpIpChannel_ctor(&channel, host, port, AF_INET);

// binding to that address
channel.super.bind(&channel);
channel.super.bind(&channel.super);

// change the super to non-blocking
channel.super.change_block_state(&channel, false);
channel.super.change_block_state(&channel.super, false);

// accept one connection
bool new_connection;
do {
new_connection = channel.super.accept(&channel);
new_connection = channel.super.accept(&channel.super);
} while (!new_connection);

// waiting for messages from client
TaggedMessage *message = NULL;
do {
message = channel.super.receive(&channel);
message = channel.super.receive(&channel.super);
sleep(1);
} while (message == NULL);

printf("Received message with connection number %i and content %s\n", message->conn_id,
(char *)message->payload.bytes);

channel.super.send(&channel, message);
channel.super.send(&channel.super, message);

channel.super.close(&channel);
channel.super.close(&channel.super);
}
12 changes: 6 additions & 6 deletions examples/posix/testing_tcp_ip_channel_server_callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TcpIpChannel channel;

void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) {
printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes);
channel.super.send(&channel, msg);
channel.super.send(&channel.super, msg);
}

int main() {
Expand All @@ -19,20 +19,20 @@ int main() {
TcpIpChannel_ctor(&channel, host, port, AF_INET);

// binding to that address
channel.super.bind(&channel);
channel.super.bind(&channel.super);

// change the super to non-blocking
channel.super.change_block_state(&channel, false);
channel.super.change_block_state(&channel.super, false);

// accept one connection
bool new_connection;
do {
new_connection = channel.super.accept(&channel);
new_connection = channel.super.accept(&channel.super);
} while (!new_connection);

channel.super.register_callback(&channel, callback_handler, NULL);
channel.super.register_callback(&channel.super, callback_handler, NULL);

sleep(100);

channel.super.close(&channel);
channel.super.close(&channel.super);
}
2 changes: 1 addition & 1 deletion examples/posix/timer_ex.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

typedef struct {
Timer super;
Reaction *effects[0];
Reaction *effects[1];
} MyTimer;

typedef struct {
Expand Down
20 changes: 8 additions & 12 deletions include/reactor-uc/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ struct Connection {
* @param self The Connection object to construct
* @param type The type of the connection. Either logical, delayed or physical.
* @param parent The reactor in which this connection appears (not the reactors of the ports it connects)
* @param upstream The pointer to the upstream port of this connection
* @param downstreams A pointer to an array of pointers to downstream ports.
* @param num_downstreams The size of the downstreams array.
* @param trigger_value A pointer to the TriggerValue that holds the data of the events that are scheduled on this
Expand All @@ -53,35 +52,32 @@ struct Connection {
* @param cleanup The cleanup function that is called at the end of timestep after all reactions have executed.
* @param trigger_downstreams The function that triggers all downstreams of this connection.
*/
void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port *upstream, Port **downstreams,
size_t num_downstreams, TriggerValue *trigger_value, void (*prepare)(Trigger *),
void (*cleanup)(Trigger *), void (*trigger_downstreams)(Connection *, const void *, size_t));
void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams,
TriggerValue *trigger_value, void (*prepare)(Trigger *), void (*cleanup)(Trigger *),
void (*trigger_downstreams)(Connection *, const void *, size_t));

struct LogicalConnection {
Connection super;
};

void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port *upstream, Port **downstreams,
size_t num_downstreams);
void LogicalConnection_ctor(LogicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams);

struct DelayedConnection {
Connection super;
interval_t delay;
TriggerValue trigger_value;
};

void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port *upstream, Port **downstreams,
size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size,
size_t value_capacity);
void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams,
interval_t delay, void *value_buf, size_t value_size, size_t value_capacity);

struct PhysicalConnection {
Connection super;
interval_t delay;
TriggerValue trigger_value;
};

void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port *upstream, Port **downstreams,
size_t num_downstreams, interval_t delay, void *value_buf, size_t value_size,
size_t value_capacity);
void PhysicalConnection_ctor(PhysicalConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams,
interval_t delay, void *value_buf, size_t value_size, size_t value_capacity);

#endif
2 changes: 1 addition & 1 deletion include/reactor-uc/federated.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct FederatedOutputConnection {
};

void FederatedOutputConnection_ctor(FederatedOutputConnection *self, Reactor *parent, FederatedConnectionBundle *bundle,
int conn_id, Port *upstream, void *value_ptr, size_t value_size);
int conn_id, void *value_ptr, size_t value_size);

// A single input connection to this federate. Has a single upstream port
struct FederatedInputConnection {
Expand Down
7 changes: 6 additions & 1 deletion include/reactor-uc/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,17 @@
#define OUTPUT_REGISTER_SOURCE(output, source) TRIGGER_REGISTER_SOURCE((Output *)&(output), (Reaction *)&(source))

// Convenience macro to register a downstream port on a connection.
// TODO: Replace the entire function with an inline macro to save memory
#define CONN_REGISTER_DOWNSTREAM(conn, down) \
do { \
((Connection *)&(conn))->register_downstream((Connection *)&(conn), (Port *)&(down)); \
} while (0)

// Convenience macro to register an upstream port on a connection
#define CONN_REGISTER_UPSTREAM(conn, up) \
do { \
((Connection *)&(conn))->upstream = (Port *)&(up); \
} while (0)

// TODO: The following macro is defined to avoid compiler warnings. Ideally we would
// not have to specify any alignment on any structs. It is a TODO to understand exactly why
// the compiler complains and what we can do about it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,34 +121,36 @@ class UcConnectionGenerator(private val reactor: Reactor) {

fun generateReactorCtorCode(conn: UcConnection) = with(PrependOperator) {
"""
|${conn.codeType}_ctor(&self->${conn.codeName}, &self->super, (Port *) &self->${getPortCodeName(conn.src)});
|${conn.codeType}_ctor(&self->${conn.codeName}, &self->super);
${" | "..generateConnectionStatements(conn)}
|
""".trimMargin()
};
fun generateConnectionStatements(conn: UcConnection) = conn.getDests().joinToString(separator = "\n") {
"CONN_REGISTER_DOWNSTREAM(self->${conn.codeName}, self->${getPortCodeName(it)});"
}
fun generateConnectionStatements(conn: UcConnection) =
"CONN_REGISTER_UPSTREAM(self->${conn.codeName}, self->${getPortCodeName(conn.src)});\n" +
conn.getDests().joinToString(separator = "\n") {
"CONN_REGISTER_DOWNSTREAM(self->${conn.codeName}, self->${getPortCodeName(it)});"}

fun generateReactorCtorCodes() = getUcConnections().joinToString(prefix = "// Initialize connections\n", separator = "\n", postfix = "\n") { generateReactorCtorCode(it)}

fun generateLogicalCtor(conn: UcConnection) = with(PrependOperator) {
"""
|static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent, Port *upstream) {
| LogicalConnection_ctor(&self->super, parent, upstream, self->_downstreams, ${conn.getDests().size});
|static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent) {
| LogicalConnection_ctor(&self->super, parent, self->_downstreams, ${conn.getDests().size});
|}
""".trimMargin()
}
fun generateDelayedCtor(conn: UcConnection) = with(PrependOperator) {
"""
|static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent, Port *upstream) {
| DelayedConnection_ctor(&self->super, parent, upstream, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize});
|static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent) {
| DelayedConnection_ctor(&self->super, parent, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize});
|}
""".trimMargin()
}
fun generatePhysicalCtor(conn: UcConnection) = with(PrependOperator) {
"""
|static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent, Port *upstream) {
| PhysicalConnection_ctor(&self->super, parent, upstream, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize});
|static void ${conn.codeType}_ctor(${conn.codeType} *self, Reactor *parent) {
| PhysicalConnection_ctor(&self->super, parent, self->_downstreams, ${conn.getDests().size}, ${conn.conn.delay.toCCode()}, self->buffer, sizeof(self->buffer[0]), ${conn.bufSize});
|}
""".trimMargin()
}
Expand Down
Loading
Loading