Skip to content

Commit

Permalink
bugfixes, readd stop for reset functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
froks committed Sep 26, 2024
1 parent 83f1f9e commit f22d2e7
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 96 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ plugins {
apply<NexusReleasePlugin>()

group = "io.github.doip-sim-ecu"
version = "0.14.1"
version = "0.14.2"

repositories {
gradlePluginPortal()
Expand Down
198 changes: 124 additions & 74 deletions src/main/kotlin/NetworkHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.slf4j.MDC
import java.net.InetAddress
import java.net.SocketException
import java.nio.file.Paths
import java.util.Collections
import javax.net.ssl.SSLServerSocket
import javax.net.ssl.SSLSocket
import kotlin.collections.component1
Expand All @@ -36,13 +37,13 @@ public open class UdpNetworkBinding(

private val udpMessageHandlers = doipEntities.associateWith { it.createDoipUdpMessageHandler() }

protected open suspend fun startVamTimer(socket: BoundDatagramSocket) {
protected open suspend fun startVamTimer(socket: BoundDatagramSocket, doipEntitiesFilter: List<DoipEntity<*>>? = null) {
if (broadcastEnabled) {
sendVams(socket)
sendVams(socket, doipEntitiesFilter)
}
}

protected open suspend fun sendVams(socket: BoundDatagramSocket) {
protected open suspend fun sendVams(socket: BoundDatagramSocket, doipEntitiesFilter: List<DoipEntity<*>>? = null) {
var vamSentCounter = 0

val entries = doipEntities.associateWith { it.generateVehicleAnnouncementMessages() }
Expand All @@ -55,6 +56,9 @@ public open class UdpNetworkBinding(
entries.forEach { (doipEntity, vams) ->
MDC.put("ecu", doipEntity.name)
vams.forEach { vam ->
if (doipEntitiesFilter != null && doipEntitiesFilter.none { vam.logicalAddress == it.config.logicalAddress }) {
return@forEach
}
logger.info("Sending VAM for ${vam.logicalAddress.toByteArray().toHexString()} as broadcast")
runBlocking(Dispatchers.IO) {
launch(MDCContext()) {
Expand All @@ -73,6 +77,10 @@ public open class UdpNetworkBinding(
}
}

public suspend fun resendVams(doipEntitiesFilter: List<DoipEntity<*>>? = null) {
startVamTimer(udpServerSocket, doipEntitiesFilter)
}

public fun start() {
thread(name = "UDP") {
runBlocking {
Expand Down Expand Up @@ -146,28 +154,53 @@ public open class TcpNetworkBinding(

private val serverSockets: MutableList<ServerSocket> = mutableListOf()
private val activeConnections: MutableMap<ActiveConnection, DoipEntity<*>> = mutableMapOf()
private val hardResettingEcus: MutableSet<Short> = Collections.synchronizedSet<Short>(mutableSetOf())

public fun isEcuHardResetting(targetAddress: Short): Boolean =
hardResettingEcus.contains(targetAddress)

public fun hardResetEcuFor(activeConnection: ActiveConnection, logicalAddress: Short, duration: kotlin.time.Duration) {
val isDoipEntity = doipEntities.any { it.config.logicalAddress == logicalAddress }

// public fun pauseTcpServerSockets(duration: kotlin.time.Duration) {
// logger.warn("Closing serversockets")
// serverSockets.forEach {
// try {
// it.close()
// } catch (ignored: Exception) {
// }
// }
// serverSockets.clear()
// logger.warn("Pausing server sockets for ${duration.inWholeMilliseconds} ms")
// Thread.sleep(duration.inWholeMilliseconds)
// logger.warn("Restarting server sockets after ${duration.inWholeMilliseconds} ms")
// runBlocking {
// launch {
// startVamTimer(udpServerSocket)
// }
// launch {
// start()
// }
// }
// }
if (isDoipEntity) {
logger.info("Closing serversockets")
serverSockets.forEach {
try {
it.close()
} catch (_: Exception) {
// ignored
}
}
logger.info("Closing active connections")
activeConnections.forEach {
try {
it.key.close()
} catch (_: Exception) {
// ignored
}
}
serverSockets.clear()
}

hardResettingEcus.add(logicalAddress)

logger.warn("Pausing server sockets for ${duration.inWholeMilliseconds} ms")
Thread.sleep(duration.inWholeMilliseconds)

hardResettingEcus.remove(logicalAddress)

if (isDoipEntity) {
logger.warn("Restarting server sockets after ${duration.inWholeMilliseconds} ms")
runBlocking {
launch {
start()
}
launch {
networkManager.resendVams(doipEntities)
}
}
}
}

public fun start() {
thread(name = "TCP") {
Expand All @@ -182,7 +215,7 @@ public open class TcpNetworkBinding(
while (!serverSocket.isClosed) {
val socket = serverSocket.accept()
val activeConnection = ActiveConnection(networkManager, this@TcpNetworkBinding, doipEntities)
activeConnection.handleTcpSocket(this@withContext, DelegatedKtorSocket(socket), null)
activeConnection.handleTcpSocket(this@withContext, DelegatedKtorSocket(socket))
}
}
}
Expand Down Expand Up @@ -249,7 +282,7 @@ public open class TcpNetworkBinding(
while (!tlsServerSocket.isClosed) {
val socket = tlsServerSocket.accept() as SSLSocket
val activeConnection = ActiveConnection(networkManager, this@TcpNetworkBinding, doipEntities)
activeConnection.handleTcpSocket(this, SSLDoipTcpSocket(socket), null)
activeConnection.handleTcpSocket(this, SSLDoipTcpSocket(socket))
}
}
}
Expand All @@ -259,15 +292,32 @@ public open class TcpNetworkBinding(
public open class ActiveConnection(
private val networkManager: NetworkManager,
private val networkBinding: TcpNetworkBinding,
private val doipEntities: List<DoipEntity<*>>
private val doipEntities: List<DoipEntity<*>>,
) {
private val logger = LoggerFactory.getLogger(ActiveConnection::class.java)
private var socket: DoipTcpSocket? = null
private var closed: Boolean = false

public open fun close() {
socket?.close()
closed = true
}

protected open suspend fun sendDoipAck(message: DoipTcpDiagMessage, output: ByteWriteChannel) {
val ack = DoipTcpDiagMessagePosAck(
message.targetAddress,
message.sourceAddress,
0x00
)
output.writeFully(ack.asByteArray)
}

public open suspend fun handleTcpSocket(
scope: CoroutineScope,
socket: DoipTcpSocket,
disableServerSocketCallback: ((kotlin.time.Duration) -> Unit)?
socket: DoipTcpSocket
) {
this.socket = socket

scope.launch(Dispatchers.IO) {
val handler = networkManager.createTcpConnectionMessageHandler(doipEntities, socket, networkBinding.tlsOptions)

Expand All @@ -278,62 +328,62 @@ public open class TcpNetworkBinding(
val output = socket.openWriteChannel()
try {
val parser = DoipTcpMessageParser(doipEntities.first().config.maxDataSize - 8)
while (!socket.isClosed) {
while (!socket.isClosed && !closed) {
val message = parser.parseDoipTcpMessage(input)

runBlocking {
try {
MDC.put("ecu", entity.name)
try {
MDC.put("ecu", entity.name)
if (message is DoipTcpDiagMessage && networkBinding.isEcuHardResetting(message.targetAddress)) {
sendDoipAck(message, output)
} else {
handler.handleTcpMessage(message, output)
} catch (e: ClosedReceiveChannelException) {
// ignore - socket was closed
logger.debugIf { "Socket was closed by remote ${socket.remoteAddress}" }
}
} catch (e: ClosedReceiveChannelException) {
// ignore - socket was closed
logger.debugIf { "Socket was closed by remote ${socket.remoteAddress}" }
withContext(Dispatchers.IO) {
handler.connectionClosed(e)
socket.runCatching { this.close() }
}
} catch (e: SocketException) {
logger.error("Socket error: ${e.message} -> closing socket")
withContext(Dispatchers.IO) {
handler.connectionClosed(e)
socket.runCatching { this.close() }
}
} catch (e: HeaderNegAckException) {
if (!socket.isClosed) {
logger.debug(
"Error in Header while parsing message, sending negative acknowledgment",
e
)
val response =
DoipTcpHeaderNegAck(DoipTcpDiagMessageNegAck.NACK_CODE_TRANSPORT_PROTOCOL_ERROR).asByteArray
output.writeFully(response)
withContext(Dispatchers.IO) {
handler.connectionClosed(e)
socket.runCatching { this.close() }
}
} catch (e: SocketException) {
logger.error("Socket error: ${e.message} -> closing socket")
}
} catch (e: DoipEntityHardResetException) {
logger.warn("Simulating Hard Reset on ${e.ecu.name} for ${e.duration.inWholeMilliseconds} ms")
output.flush()
socket.close()

networkBinding.hardResetEcuFor(this@ActiveConnection, e.ecu.config.logicalAddress, e.duration)
} catch (e: Exception) {
if (!socket.isClosed) {
logger.error(
"Unknown error parsing/handling message, sending negative acknowledgment",
e
)
val response =
DoipTcpHeaderNegAck(DoipTcpDiagMessageNegAck.NACK_CODE_TRANSPORT_PROTOCOL_ERROR).asByteArray
output.writeFully(response)
withContext(Dispatchers.IO) {
handler.connectionClosed(e)
socket.runCatching { this.close() }
}
} catch (e: HeaderNegAckException) {
if (!socket.isClosed) {
logger.debug(
"Error in Header while parsing message, sending negative acknowledgment",
e
)
val response =
DoipTcpHeaderNegAck(DoipTcpDiagMessageNegAck.NACK_CODE_TRANSPORT_PROTOCOL_ERROR).asByteArray
output.writeFully(response)
withContext(Dispatchers.IO) {
handler.connectionClosed(e)
socket.runCatching { this.close() }
}
}
} catch (e: DoipEntityHardResetException) {
logger.warn("Simulating Hard Reset on ${entity.name} for ${e.duration.inWholeMilliseconds} ms")
output.flush()
socket.close()

if (disableServerSocketCallback != null) {
disableServerSocketCallback(e.duration)
}
} catch (e: Exception) {
if (!socket.isClosed) {
logger.error(
"Unknown error parsing/handling message, sending negative acknowledgment",
e
)
val response =
DoipTcpHeaderNegAck(DoipTcpDiagMessageNegAck.NACK_CODE_TRANSPORT_PROTOCOL_ERROR).asByteArray
output.writeFully(response)
withContext(Dispatchers.IO) {
handler.connectionClosed(e)
socket.runCatching { this.close() }
}
}
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/main/kotlin/NetworkManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public open class NetworkManager(
public val doipEntities: List<DoipEntity<*>>,
) {
private val log = LoggerFactory.getLogger(NetworkManager::class.java)
private val udpNetworkBindings: MutableList<UdpNetworkBinding> = mutableListOf()
private val tcpNetworkBindings: MutableList<TcpNetworkBinding> = mutableListOf()

protected open fun findInterfaceByName(): NetworkInterface? {
var foundInterface: NetworkInterface? = null
Expand Down Expand Up @@ -102,17 +104,20 @@ public open class NetworkManager(
map.forEach { (address, entities) ->
val unb = createUdpNetworkBinding(address, entities)
unb.start()
udpNetworkBindings.add(unb)
}

if (config.bindOnAnyForUdpAdditional && !map.containsKey("0.0.0.0")) {
val unb = createUdpNetworkBindingAny()
unb.start()
udpNetworkBindings.add(unb)
}

// TCP
map.forEach { (address, entities) ->
val tnb = createTcpNetworkBinding(address, entities)
tnb.start()
tcpNetworkBindings.add(tnb)
}
}

Expand All @@ -133,5 +138,11 @@ public open class NetworkManager(

public open fun createTcpConnectionMessageHandler(doipEntities: List<DoipEntity<*>>, socket: DoipTcpSocket, tlsOptions: TlsOptions?): DoipTcpConnectionMessageHandler =
GroupDoipTcpConnectionMessageHandler(doipEntities, socket, tlsOptions)

public open suspend fun resendVams(entities: List<DoipEntity<*>>) {
udpNetworkBindings.forEach {
it.resendVams(entities)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ public open class DefaultDoipEntityTcpConnectionMessageHandler(
private val logger: Logger = LoggerFactory.getLogger(DefaultDoipEntityTcpConnectionMessageHandler::class.java)

override suspend fun handleTcpMessage(message: DoipTcpMessage, output: ByteWriteChannel) {
runBlocking {
MDC.put("ecu", doipEntity.name)
launch(MDCContext()) {
super.handleTcpMessage(message, output)
}
}
MDC.put("ecu", doipEntity.name)
super.handleTcpMessage(message, output)
}

override suspend fun handleTcpRoutingActivationRequest(
Expand Down
18 changes: 5 additions & 13 deletions src/main/kotlin/library/DoipEntity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,16 @@ public abstract class DoipEntity<out T : SimulatedEcu>(
override suspend fun onIncomingDiagMessage(diagMessage: DoipTcpDiagMessage, output: ByteWriteChannel) {
val ecu = targetEcusByLogical[diagMessage.targetAddress]
ecu?.run {
runBlocking {
MDC.put("ecu", ecu.name)
launch(MDCContext()) {
onIncomingUdsMessage(diagMessage.toUdsMessage(UdsMessage.PHYSICAL, output, ecu.config.logicalAddress))
}
}
MDC.put("ecu", ecu.name)
onIncomingUdsMessage(diagMessage.toUdsMessage(UdsMessage.PHYSICAL, output, ecu.config.logicalAddress))
// Exit if the target ecu was found by physical
return
}

val ecus = targetEcusByFunctional[diagMessage.targetAddress]
ecus?.forEach {
runBlocking {
MDC.put("ecu", it.name)
launch(MDCContext()) {
it.onIncomingUdsMessage(diagMessage.toUdsMessage(UdsMessage.FUNCTIONAL, output, it.config.logicalAddress))
}
}
ecus?.forEach { ecu ->
MDC.put("ecu", ecu.name)
ecu.onIncomingUdsMessage(diagMessage.toUdsMessage(UdsMessage.FUNCTIONAL, output, ecu.config.logicalAddress))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public open class GroupDoipTcpConnectionMessageHandler(
diagMessage: DoipTcpDiagMessage,
output: ByteWriteChannel
) {
val handler = list.firstOrNull { it.existsTargetAddress(diagMessage.targetAddress) } ?: list.first()
handler.onIncomingDiagMessage(diagMessage, output)
val handler = list.filter { it.existsTargetAddress(diagMessage.targetAddress) }
handler.forEach { it.onIncomingDiagMessage(diagMessage, output) }
}
}
}

0 comments on commit f22d2e7

Please sign in to comment.