Skip to content

Commit

Permalink
improve auto networking mode
Browse files Browse the repository at this point in the history
  • Loading branch information
froks committed Sep 27, 2024
1 parent f22d2e7 commit d34a08e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 32 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ plugins {
apply<NexusReleasePlugin>()

group = "io.github.doip-sim-ecu"
version = "0.14.2"
version = "0.15.0"

repositories {
gradlePluginPortal()
Expand Down
116 changes: 88 additions & 28 deletions src/main/kotlin/NetworkHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,44 @@ import kotlin.concurrent.fixedRateTimer
import kotlin.concurrent.thread
import kotlin.system.exitProcess

public open class UdpNetworkBindingAny(private val port: Int = 13400, private val sendVirReply: (target: SocketAddress) -> Unit) {
private val logger = LoggerFactory.getLogger(UdpNetworkBindingAny::class.java)

private lateinit var udpServerSocket: BoundDatagramSocket

public open fun start() {
thread(name = "UDP") {
runBlocking {
udpServerSocket = aSocket(ActorSelectorManager(Dispatchers.IO))
.udp()
.bind(localAddress = InetSocketAddress(hostname = "0.0.0.0", port = port)) {
broadcast = true
reuseAddress = true
// reusePort = true // not supported on windows
typeOfService = TypeOfService.IPTOS_RELIABILITY
// socket.joinGroup(multicastAddress)
}
logger.info("Listening on udp: ${udpServerSocket.localAddress}")

while (!udpServerSocket.isClosed) {
val datagram = udpServerSocket.receive()
try {
val message = DoipUdpMessageParser.parseUDP(datagram.packet)
if (message is DoipUdpVehicleInformationRequest ||
message is DoipUdpVehicleInformationRequestWithEid ||
message is DoipUdpVehicleInformationRequestWithVIN
) {
sendVirReply(datagram.address)
}
} catch (e: Exception) {
logger.error("Unknown error while processing message", e)
}
}
}
}
}
}

public open class UdpNetworkBinding(
private val localAddress: String,
private val port: Int = 13400,
Expand All @@ -37,39 +75,51 @@ public open class UdpNetworkBinding(

private val udpMessageHandlers = doipEntities.associateWith { it.createDoipUdpMessageHandler() }

protected open suspend fun startVamTimer(socket: BoundDatagramSocket, doipEntitiesFilter: List<DoipEntity<*>>? = null) {
public open suspend fun sendVirReply(address: SocketAddress) {
internalSendVams(address, null)
}

protected open suspend fun startVamTimer(doipEntitiesFilter: List<DoipEntity<*>>? = null) {
if (broadcastEnabled) {
sendVams(socket, doipEntitiesFilter)
sendVams(doipEntitiesFilter)
}
}

protected open suspend fun sendVams(socket: BoundDatagramSocket, doipEntitiesFilter: List<DoipEntity<*>>? = null) {
var vamSentCounter = 0

protected open suspend fun internalSendVams(
address: SocketAddress,
doipEntitiesFilter: List<DoipEntity<*>>? = null
) {
val entries = doipEntities.associateWith { it.generateVehicleAnnouncementMessages() }

entries.forEach { (doipEntity, vams) ->
MDC.put("ecu", doipEntity.name)
vams.forEach { vam ->
if (doipEntitiesFilter != null && doipEntitiesFilter.none { vam.logicalAddress == it.config.logicalAddress }) {
return@forEach
}
logger.info("Sending VAM for ${vam.logicalAddress.toByteArray().toHexString()} as broadcast")
udpServerSocket.send(
Datagram(
packet = ByteReadPacket(vam.asByteArray),
address = address
)
)
}
}
}

protected open fun sendVams(doipEntitiesFilter: List<DoipEntity<*>>? = null) {
var vamSentCounter = 0

fixedRateTimer("VAM", daemon = true, initialDelay = 500, period = 500) {
if (vamSentCounter >= 3) {
this.cancel()
return@fixedRateTimer
}
entries.forEach { (doipEntity, vams) ->
MDC.put("ecu", doipEntity.name)
vams.forEach { vam ->
if (doipEntitiesFilter != null && doipEntitiesFilter.none { vam.logicalAddress == it.config.logicalAddress }) {
return@forEach
}
logger.info("Sending VAM for ${vam.logicalAddress.toByteArray().toHexString()} as broadcast")
runBlocking(Dispatchers.IO) {
launch(MDCContext()) {
socket.send(
Datagram(
packet = ByteReadPacket(vam.asByteArray),
address = InetSocketAddress(broadcastAddress, port)
)
)
}
}

runBlocking(Dispatchers.IO) {
launch(MDCContext()) {
internalSendVams(InetSocketAddress(broadcastAddress, port), doipEntitiesFilter)
}
}

Expand All @@ -78,7 +128,7 @@ public open class UdpNetworkBinding(
}

public suspend fun resendVams(doipEntitiesFilter: List<DoipEntity<*>>? = null) {
startVamTimer(udpServerSocket, doipEntitiesFilter)
startVamTimer(doipEntitiesFilter)
}

public fun start() {
Expand All @@ -94,7 +144,7 @@ public open class UdpNetworkBinding(
// socket.joinGroup(multicastAddress)
}
logger.info("Listening on udp: ${udpServerSocket.localAddress}")
startVamTimer(udpServerSocket)
startVamTimer()

while (!udpServerSocket.isClosed) {
val datagram = udpServerSocket.receive()
Expand Down Expand Up @@ -159,7 +209,11 @@ public open class TcpNetworkBinding(
public fun isEcuHardResetting(targetAddress: Short): Boolean =
hardResettingEcus.contains(targetAddress)

public fun hardResetEcuFor(activeConnection: ActiveConnection, logicalAddress: Short, duration: kotlin.time.Duration) {
public fun hardResetEcuFor(
activeConnection: ActiveConnection,
logicalAddress: Short,
duration: kotlin.time.Duration
) {
val isDoipEntity = doipEntities.any { it.config.logicalAddress == logicalAddress }

if (isDoipEntity) {
Expand Down Expand Up @@ -319,7 +373,8 @@ public open class TcpNetworkBinding(
this.socket = socket

scope.launch(Dispatchers.IO) {
val handler = networkManager.createTcpConnectionMessageHandler(doipEntities, socket, networkBinding.tlsOptions)
val handler =
networkManager.createTcpConnectionMessageHandler(doipEntities, socket, networkBinding.tlsOptions)

val entity = doipEntities.first()

Expand All @@ -330,7 +385,6 @@ public open class TcpNetworkBinding(
val parser = DoipTcpMessageParser(doipEntities.first().config.maxDataSize - 8)
while (!socket.isClosed && !closed) {
val message = parser.parseDoipTcpMessage(input)

try {
MDC.put("ecu", entity.name)
if (message is DoipTcpDiagMessage && networkBinding.isEcuHardResetting(message.targetAddress)) {
Expand Down Expand Up @@ -370,7 +424,11 @@ public open class TcpNetworkBinding(
output.flush()
socket.close()

networkBinding.hardResetEcuFor(this@ActiveConnection, e.ecu.config.logicalAddress, e.duration)
networkBinding.hardResetEcuFor(
this@ActiveConnection,
e.ecu.config.logicalAddress,
e.duration
)
} catch (e: Exception) {
if (!socket.isClosed) {
logger.error(
Expand All @@ -387,6 +445,8 @@ public open class TcpNetworkBinding(
}
}
}
} catch (_: ClosedReceiveChannelException) {
logger.info("Connection closed by remote ${socket.remoteAddress}")
} catch (e: Throwable) {
logger.error("Unknown error inside socket processing loop, closing socket", e)
} finally {
Expand Down
18 changes: 15 additions & 3 deletions src/main/kotlin/NetworkManager.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import io.ktor.network.sockets.SocketAddress
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import library.DoipEntity
import library.DoipTcpConnectionMessageHandler
import library.DoipTcpSocket
Expand Down Expand Up @@ -110,7 +113,6 @@ public open class NetworkManager(
if (config.bindOnAnyForUdpAdditional && !map.containsKey("0.0.0.0")) {
val unb = createUdpNetworkBindingAny()
unb.start()
udpNetworkBindings.add(unb)
}

// TCP
Expand All @@ -133,8 +135,18 @@ public open class NetworkManager(
): UdpNetworkBinding =
UdpNetworkBinding(address, config.localPort, config.broadcastEnable, config.broadcastAddress, entities)

protected open fun createUdpNetworkBindingAny(): UdpNetworkBinding =
UdpNetworkBinding("0.0.0.0", config.localPort, config.broadcastEnable, config.broadcastAddress, doipEntities)
protected open fun createUdpNetworkBindingAny(): UdpNetworkBindingAny =
UdpNetworkBindingAny(config.localPort, ::sendVirReply)

protected open fun sendVirReply(socket: SocketAddress) {
runBlocking(Dispatchers.IO) {
runBlocking {
udpNetworkBindings.forEach {
it.sendVirReply(socket)
}
}
}
}

public open fun createTcpConnectionMessageHandler(doipEntities: List<DoipEntity<*>>, socket: DoipTcpSocket, tlsOptions: TlsOptions?): DoipTcpConnectionMessageHandler =
GroupDoipTcpConnectionMessageHandler(doipEntities, socket, tlsOptions)
Expand Down
7 changes: 7 additions & 0 deletions src/main/kotlin/SimNetworking.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import library.*
import org.slf4j.LoggerFactory

public enum class NetworkMode {
AUTO,
Expand Down Expand Up @@ -71,6 +72,8 @@ public open class SimDoipNetworking(data: NetworkingData) : SimNetworking<SimEcu
}

public abstract class SimNetworking<E : SimulatedEcu, out T : DoipEntity<E>>(public val data: NetworkingData) {
private val log = LoggerFactory.getLogger(SimNetworking::class.java)

public val doipEntities: List<T>
get() {
if (data.doipEntities.isNotEmpty() && _doipEntities.isEmpty()) {
Expand All @@ -83,6 +86,10 @@ public abstract class SimNetworking<E : SimulatedEcu, out T : DoipEntity<E>>(pub
protected val _vams: MutableList<DoipUdpVehicleAnnouncementMessage> = mutableListOf()

protected fun addEntity(doipEntity: @UnsafeVariance T) {
if (_doipEntities.any { it.config.logicalAddress == doipEntity.config.logicalAddress }) {
log.error("Can't add '${doipEntity.name}' - entity with same logical address already exists")
return
}
_doipEntities.add(doipEntity)
}

Expand Down

0 comments on commit d34a08e

Please sign in to comment.