Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
erlingrj committed Jan 3, 2025
1 parent c049c73 commit 991264a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 74 deletions.
4 changes: 2 additions & 2 deletions include/reactor-uc/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ typedef struct FederatedInputConnection FederatedInputConnection;
} ReactorName##_##InputName;

#define LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(ReactorName, InputName, BufferType, BufferSize, Delay, IsPhysical) \
void ReactorName##_##InputName##_conn_ctor(ReactorName##_##InputName *self, Reactor *parent) { \
void ReactorName##_##InputName##_conn_ctor(ReactorName##_##InputName *self, Reactor *parent) { \
FederatedInputConnection_ctor(&self->super, parent, Delay, IsPhysical, (Port **)&self->downstreams, 1, \
(void *)&self->payload_buf, (bool *)&self->payload_used_buf, sizeof(BufferType), \
BufferSize); \
Expand All @@ -621,7 +621,7 @@ typedef struct FederatedInputConnection FederatedInputConnection;

#define LF_INITIALIZE_FEDERATED_INPUT_CONNECTION(ReactorName, InputName, DeserializeFunc) \
ReactorName##_##InputName##_conn_ctor(&self->InputName, self->super.parent); \
self->inputs[_inputs_idx] = &self->InputName.super; \
self->inputs[_inputs_idx] = &self->InputName.super; \
self->deserialize_hooks[_inputs_idx] = DeserializeFunc; \
_inputs_idx++;

Expand Down
1 change: 0 additions & 1 deletion include/reactor-uc/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ void lf_connect_federated_output(Connection *connection, Port *output);

void lf_connect_federated_input(Connection *connection, Port *input);


#endif
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import org.lflang.generator.PrependOperator
import org.lflang.generator.orNever
import org.lflang.generator.uc.UcConnectionGenerator.Companion.isFederated
import org.lflang.generator.uc.UcInstanceGenerator.Companion.codeTypeFederate
import org.lflang.generator.uc.UcInstanceGenerator.Companion.isAFederate
import org.lflang.generator.uc.UcInstanceGenerator.Companion.width
import org.lflang.generator.uc.UcPortGenerator.Companion.width
import org.lflang.generator.uc.UcReactorGenerator.Companion.codeType
Expand All @@ -16,12 +15,7 @@ class UcFederate(val federate: Instantiation, bankIdx: Int) {
val isBank = federate.isBank
}

class UcConnection(val src: VarRef) {

}

class UcConnectionChannel(val src: UcChannel, val dest: UcChannel, val conn: Connection) {

companion object {
fun parseConnectionChannels(conn: Connection): List<UcConnectionChannel> {
val res = mutableListOf<UcConnectionChannel>()
Expand Down Expand Up @@ -69,7 +63,7 @@ class UcConnectionChannel(val src: UcChannel, val dest: UcChannel, val conn: Con
}
}

open class UcGroupedConnection2(val src: VarRef, val channels: List<UcConnectionChannel>, val lfConn: Connection, val uid: Int) {
class UcGroupedConnection(val src: VarRef, val channels: List<UcConnectionChannel>, val lfConn: Connection, val uid: Int) {
val delay = lfConn.delay.orNever().toCCode()
val isPhysical = lfConn.isPhysical
val isLogical = lfConn.isLogical
Expand All @@ -96,8 +90,8 @@ open class UcGroupedConnection2(val src: VarRef, val channels: List<UcConnection
private val Connection.isLogical
get(): Boolean = !this.isPhysical && this.delay == null

fun groupConnections(channels: List<UcConnectionChannel>): List<UcGroupedConnection2> {
val res = mutableListOf<UcGroupedConnection2>()
fun groupConnections(channels: List<UcConnectionChannel>): List<UcGroupedConnection> {
val res = mutableListOf<UcGroupedConnection>()
val channels = HashSet(channels)

while (channels.isNotEmpty()) {
Expand All @@ -118,7 +112,7 @@ open class UcGroupedConnection2(val src: VarRef, val channels: List<UcConnection
}

val groupedChannels = others.plus(c)
val groupedConnection = UcGroupedConnection2(c.src.varRef, groupedChannels, c.conn, res.size)
val groupedConnection = UcGroupedConnection(c.src.varRef, groupedChannels, c.conn, res.size)
res.add(groupedConnection)
channels.removeAll(groupedChannels)
}
Expand All @@ -131,46 +125,10 @@ class UcFederatedConnectionBundle2(val src: UcFederate, val dest: UcFederate, va
}


// Representing a runtime connection with a single upstream port. An optimization is to group
// all identical connections (with same delay and is_physical) in a single connection
class UcGroupedConnection(val srcVarRef: VarRef, val dstVarRef: VarRef, val conn: Connection, val uid: Int) {
private val dests = mutableListOf<List<Pair<UcChannel, UcChannel>>>()
private var destsBuffer = mutableListOf<Pair<UcChannel, UcChannel>>()
val srcInst: Instantiation? = srcVarRef.container
val dstInst: Instantiation? =
dstVarRef.container // FIXME: A little hacky since it only makes sense for FederatedGroupConnections
val srcPort = srcVarRef.variable as Port
val bankWidth = srcInst?.width ?: 1
val portWidth = srcPort.width
val uniqueName = "conn_${srcInst?.name ?: ""}_${srcPort.name}_${uid}"
val maxNumPendingEvents = 16 // FIXME: Must be derived from the program

val delay = conn.delay.orNever().toCCode()
val isPhysical = conn.isPhysical
val isLogical = !isPhysical && conn.delay == null
val isFederated = srcInst != null && srcInst.isAFederate
val serializeFunc = "serialize_payload_default"
val deserializeFunc = "deserialize_payload_default"


fun numDownstreams() = dests.size

fun getDests() = dests

fun addDest(channels: Pair<UcChannel, UcChannel>) {
destsBuffer.add(channels)
}

fun commit() {
dests.add(destsBuffer)
destsBuffer = mutableListOf<Pair<UcChannel, UcChannel>>()
}
}

class UcFederatedConnectionBundle(
val src: Instantiation,
val dst: Instantiation,
val groupedConnections: List<UcGroupedConnection2>
val groupedConnections: List<UcGroupedConnection>
) {
val channelCodeType = "TcpIpChannel"

Expand Down Expand Up @@ -341,13 +299,13 @@ class UcPort(val varRef: VarRef) {
//}

class UcConnectionGenerator(private val reactor: Reactor, private val federate: Instantiation?) {
private val ucGroupedConnections: List<UcGroupedConnection2>
private val ucGroupedConnections: List<UcGroupedConnection>
private val ucFederatedConnectionBundles: List<UcFederatedConnectionBundle>

init {
val channels = mutableListOf<UcConnectionChannel>()
reactor.allConnections.forEach { channels.addAll(UcConnectionChannel.parseConnectionChannels(it)) }
ucGroupedConnections = UcGroupedConnection2.groupConnections(channels)
ucGroupedConnections = UcGroupedConnection.groupConnections(channels)

ucFederatedConnectionBundles = emptyList()
}
Expand All @@ -369,56 +327,56 @@ class UcConnectionGenerator(private val reactor: Reactor, private val federate:
return count
}

private fun generateLogicalSelfStruct(conn: UcGroupedConnection2) =
private fun generateLogicalSelfStruct(conn: UcGroupedConnection) =
"LF_DEFINE_LOGICAL_CONNECTION_STRUCT(${reactor.codeType}, ${conn.uniqueName}, ${conn.numDownstreams()});"

private fun generateLogicalCtor(conn: UcGroupedConnection2) =
private fun generateLogicalCtor(conn: UcGroupedConnection) =
"LF_DEFINE_LOGICAL_CONNECTION_CTOR(${reactor.codeType}, ${conn.uniqueName}, ${conn.numDownstreams()});"

private fun generateDelayedSelfStruct(conn: UcGroupedConnection2) =
private fun generateDelayedSelfStruct(conn: UcGroupedConnection) =
"LF_DEFINE_DELAYED_CONNECTION_STRUCT(${reactor.codeType}, ${conn.uniqueName}, ${conn.numDownstreams()}, ${conn.srcPort.type.toText()}, ${conn.maxNumPendingEvents}, ${conn.delay});"

private fun generateDelayedCtor(conn: UcGroupedConnection2) =
private fun generateDelayedCtor(conn: UcGroupedConnection) =
"LF_DEFINE_DELAYED_CONNECTION_CTOR(${reactor.codeType}, ${conn.uniqueName}, ${conn.numDownstreams()}, ${conn.srcPort.type.toText()}, ${conn.maxNumPendingEvents}, ${conn.delay}, ${conn.isPhysical});"

private fun generateFederatedInputSelfStruct(conn: UcGroupedConnection2) =
private fun generateFederatedInputSelfStruct(conn: UcGroupedConnection) =
"LF_DEFINE_FEDERATED_INPUT_CONNECTION_STRUCT(${reactor.codeType}, ${conn.uniqueName}, ${conn.srcPort.type.toText()}, ${conn.maxNumPendingEvents});"

private fun generateFederatedInputCtor(conn: UcGroupedConnection2) =
private fun generateFederatedInputCtor(conn: UcGroupedConnection) =
"LF_DEFINE_FEDERATED_INPUT_CONNECTION_CTOR(${reactor.codeType}, ${conn.uniqueName}, ${conn.srcPort.type.toText()}, ${conn.maxNumPendingEvents}, ${conn.delay}, ${conn.isPhysical});"

private fun generateFederatedOutputSelfStruct(conn: UcGroupedConnection2) =
private fun generateFederatedOutputSelfStruct(conn: UcGroupedConnection) =
"LF_DEFINE_FEDERATED_OUTPUT_CONNECTION_STRUCT(${reactor.codeType}, ${conn.uniqueName}, ${conn.srcPort.type.toText()});"

private fun generateFederatedOutputCtor(conn: UcGroupedConnection2) =
private fun generateFederatedOutputCtor(conn: UcGroupedConnection) =
"LF_DEFINE_FEDERATED_OUTPUT_CONNECTION_CTOR(${reactor.codeType}, ${conn.uniqueName}, ${conn.srcPort.type.toText()});"

private fun generateFederatedConnectionSelfStruct(conn: UcGroupedConnection2) =
private fun generateFederatedConnectionSelfStruct(conn: UcGroupedConnection) =
if (conn.srcInst == federate) generateFederatedOutputSelfStruct(conn) else generateFederatedInputSelfStruct(conn)

private fun generateFederatedConnectionCtor(conn: UcGroupedConnection2) =
private fun generateFederatedConnectionCtor(conn: UcGroupedConnection) =
if (conn.srcInst == federate) generateFederatedOutputCtor(conn) else generateFederatedInputCtor(conn)

private fun generateFederatedOutputInstance(conn: UcGroupedConnection2) =
private fun generateFederatedOutputInstance(conn: UcGroupedConnection) =
"LF_FEDERATED_OUTPUT_CONNECTION_INSTANCE(${reactor.codeType}, ${conn.uniqueName});"

private fun generateFederatedInputInstance(conn: UcGroupedConnection2) =
private fun generateFederatedInputInstance(conn: UcGroupedConnection) =
"LF_FEDERATED_INPUT_CONNECTION_INSTANCE(${reactor.codeType}, ${conn.uniqueName});"

private fun generateFederatedConnectionInstance(conn: UcGroupedConnection2) =
private fun generateFederatedConnectionInstance(conn: UcGroupedConnection) =
if (conn.srcInst == federate) generateFederatedOutputInstance(conn) else generateFederatedInputInstance(conn)

private fun generateInitializeFederatedOutput(conn: UcGroupedConnection2) =
private fun generateInitializeFederatedOutput(conn: UcGroupedConnection) =
"LF_INITIALIZE_FEDERATED_OUTPUT_CONNECTION(${reactor.codeType}, ${conn.uniqueName}, ${conn.serializeFunc});"

private fun generateInitializeFederatedInput(conn: UcGroupedConnection2) =
private fun generateInitializeFederatedInput(conn: UcGroupedConnection) =
"LF_INITIALIZE_FEDERATED_INPUT_CONNECTION(${reactor.codeType}, ${conn.uniqueName}, ${conn.deserializeFunc});"

private fun generateInitializeFederatedConnection(conn: UcGroupedConnection2) =
private fun generateInitializeFederatedConnection(conn: UcGroupedConnection) =
if (conn.srcInst == federate) generateInitializeFederatedOutput(conn) else generateInitializeFederatedInput(conn)


private fun generateReactorCtorCode(conn: UcGroupedConnection2) = with(PrependOperator) {
private fun generateReactorCtorCode(conn: UcGroupedConnection) = with(PrependOperator) {
"""
|${if (conn.isLogical) "LF_INITIALIZE_LOGICAL_CONNECTION(" else "LF_INITIALIZE_DELAYED_CONNECTION("}${reactor.codeType}, ${conn.uniqueName}, ${conn.bankWidth}, ${conn.portWidth});
""".trimMargin()
Expand All @@ -427,23 +385,23 @@ class UcConnectionGenerator(private val reactor: Reactor, private val federate:
private fun generateFederateCtorCode(conn: UcFederatedConnectionBundle) =
"LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(${conn.src.codeTypeFederate}, ${conn.dst.codeTypeFederate});"

private fun generateConnectChannel(groupedConn: UcGroupedConnection2, channel: UcConnectionChannel) =
private fun generateConnectChannel(groupedConn: UcGroupedConnection, channel: UcConnectionChannel) =
with(PrependOperator) {
"""|lf_connect((Connection *) &self->${groupedConn.uniqueName}[${channel.src.bank_idx}][${channel.src.port_idx}], (Port *) ${channel.src.generateChannelPointer()}, (Port *) ${channel.dest.generateChannelPointer()});
""".trimMargin()
}

private fun generateConnectionStatements(conn: UcGroupedConnection2) = with(PrependOperator) {
private fun generateConnectionStatements(conn: UcGroupedConnection) = with(PrependOperator) {
conn.channels
.joinToString(separator = "\n") { generateConnectChannel(conn, it) }
}

private fun generateConnectFederateOutputChannel(bundle: UcFederatedConnectionBundle, conn: UcGroupedConnection2) =
private fun generateConnectFederateOutputChannel(bundle: UcFederatedConnectionBundle, conn: UcGroupedConnection) =
conn.channels.joinWithLn {
"lf_connect_federated_output((Connection *)&self->LF_FEDERATED_CONNECTION_BUNDLE_NAME(${bundle.src.codeTypeFederate}, ${bundle.dst.codeTypeFederate}).${conn.uniqueName}, (Port*) &self->${bundle.src.name}[0].${it.src.varRef.name}[0]);"
}

private fun generateConnectFederateInputChannel(bundle: UcFederatedConnectionBundle, conn: UcGroupedConnection2) =
private fun generateConnectFederateInputChannel(bundle: UcFederatedConnectionBundle, conn: UcGroupedConnection) =
conn.channels.joinWithLn {
"lf_connect_federated_input((Connection *)&self->LF_FEDERATED_CONNECTION_BUNDLE_NAME(${bundle.src.codeTypeFederate}, ${bundle.dst.codeTypeFederate}).${conn.uniqueName}, (Port*) &self->${bundle.dst.name}[0].${it.dest.varRef.name}[0]);"
}
Expand Down
2 changes: 1 addition & 1 deletion src/platform/posix/tcp_ip_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ static lf_ret_t _TcpIpChannel_receive(NetworkChannel *untyped_self, FederateMess
continue;
} else if (bytes_read == 0) {
// This means the connection was closed.
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CLOSED);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CLOSED);
TCP_IP_CHANNEL_DEBUG("Other federate gracefully closed socket");
return LF_ERR;
}
Expand Down

0 comments on commit 991264a

Please sign in to comment.