Skip to content

Commit

Permalink
Merge pull request #49 from compscidr/jason/tcp-session-close
Browse files Browse the repository at this point in the history
Tcp Session Close
  • Loading branch information
compscidr authored Dec 2, 2024
2 parents 91d7630 + 2e22837 commit 65b521f
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 19 deletions.
11 changes: 8 additions & 3 deletions core/src/main/kotlin/com/jasonernst/kanonproxy/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ abstract class Session(
val currentTime = System.currentTimeMillis()
val difference = currentTime - session.connectTime
if (difference > STALE_SESSION_MS) {
val error = "Timed trying to reach remote out on TCP connect"
val error = "Timed out trying to reach remote on TCP connect"
logger.error(error)
handleExceptionOnRemoteChannel(Exception(error))
selector.close()
break
}
}
}
Expand Down Expand Up @@ -204,7 +206,9 @@ abstract class Session(
it.interestOps(SelectionKey.OP_READ)
}
if (it.isReadable && it.isValid) {
read()
if (!read()) {
it.interestOps(SelectionKey.OP_READ.inv())
}
}
if (it.isConnectable) {
val socketChannel = it.channel() as SocketChannel
Expand Down Expand Up @@ -254,7 +258,8 @@ abstract class Session(
return len
}

abstract fun read()
// should return false if the read failed and we should unsub from reads
abstract fun read(): Boolean

fun handleExceptionOnRemoteChannel(e: Exception) {
logger.error("Error creating session $this : ${e.message}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class AnonymousTcpSession(
}
}

override fun read() {
override fun read(): Boolean {
try {
val maxRead = tcpStateMachine.availableOutgoingBufferSpace()
val len =
Expand All @@ -130,6 +130,7 @@ class AnonymousTcpSession(
returnQueue.add(finPacket)
tcpStateMachine.enqueueRetransmit(finPacket)
}
return false
}
} catch (e: Exception) {
logger.warn("Remote Tcp channel closed ${e.message}")
Expand All @@ -138,6 +139,8 @@ class AnonymousTcpSession(
returnQueue.add(finPacket)
tcpStateMachine.enqueueRetransmit(finPacket)
}
return false
}
return true
}
}
12 changes: 8 additions & 4 deletions core/src/main/kotlin/com/jasonernst/kanonproxy/tcp/TcpSession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ abstract class TcpSession(
swapSourceAndDestination: Boolean = true,
requiresLock: Boolean,
): Packet? {
if (tcpStateMachine.tcpState.value == TcpState.CLOSED || tcpStateMachine.tcpState.value == TcpState.TIME_WAIT) {
// prevent going into all this if we're already closed
return null
}
val packet =
runBlocking {
if (requiresLock) {
logger.warn("TCP TEARDOWN CALLED, WAITING FOR LOCK")
// logger.warn("TCP TEARDOWN CALLED, WAITING FOR LOCK")
tcpStateMachine.tcbMutex.lock()
logger.warn("TCP TEARDOWN LOCK ACQUIRED")
// logger.warn("TCP TEARDOWN LOCK ACQUIRED")
}

logger.debug(
Expand All @@ -89,7 +93,7 @@ abstract class TcpSession(
tearDownPending.set(true)
return@runBlocking null
} else {
logger.debug("No outgoing bytes to send, proceeding with TEARDOWN")
// logger.debug("No outgoing bytes to send, proceeding with TEARDOWN")
}

if (tcpStateMachine.transmissionControlBlock == null) {
Expand Down Expand Up @@ -154,7 +158,7 @@ abstract class TcpSession(
}
if (requiresLock) {
tcpStateMachine.tcbMutex.unlock()
logger.warn("TCP TEARDOWN LOCK RELEASED")
// logger.warn("TCP TEARDOWN LOCK RELEASED")
}
return packet
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class UdpSession(
}
}

override fun read() {
override fun read(): Boolean {
var closed = false
try {
val len = handleReturnTrafficLoop(readBuffer.capacity())
Expand All @@ -76,7 +76,9 @@ class UdpSession(
}
if (closed) {
logger.warn("Remote Udp channel closed")
return false
}
return true
}

override fun handlePayloadFromInternet(payload: ByteArray) {
Expand Down
31 changes: 21 additions & 10 deletions core/src/test/kotlin/com/jasonernst/kanonproxy/tcp/TcpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ class TcpClient(
waitForTimeWait: Boolean = false,
timeOutMs: Long = 2000,
) {
if (tcpStateMachine.tcpState.value == TcpState.CLOSED ||
(tcpStateMachine.tcpState.value == TcpState.TIME_WAIT && !waitForTimeWait)
) {
logger.debug("Already closed, state: ${tcpStateMachine.tcpState.value}")
cleanup(false)
return
}
// send the FIN
val finPacket = super.teardown(false, true)
if (finPacket != null) {
Expand Down Expand Up @@ -311,6 +318,19 @@ class TcpClient(
}
// give a little extra time for the ACK for the FIN to from the other side to be enqueued and sent out
Thread.sleep(100)
cleanup(waitForTimeWait)
if (waitForTimeWait) {
if (tcpStateMachine.tcpState.value != TcpState.CLOSED) {
throw RuntimeException("Failed to close, state: ${tcpStateMachine.tcpState.value}")
}
} else {
if (tcpStateMachine.tcpState.value != TcpState.TIME_WAIT && tcpStateMachine.tcpState.value != TcpState.CLOSED) {
throw RuntimeException("Failed to close, state: ${tcpStateMachine.tcpState.value}")
}
}
}

private fun cleanup(waitForTimeWait: Boolean) {
val session = this
runBlocking {
isRunning.set(false)
Expand All @@ -324,23 +344,14 @@ class TcpClient(
writeJob.cancelAndJoin()
logger.debug("Jobs finished")
}
if (waitForTimeWait) {
if (tcpStateMachine.tcpState.value != TcpState.CLOSED) {
throw RuntimeException("Failed to close, state: ${tcpStateMachine.tcpState.value}")
}
} else {
if (tcpStateMachine.tcpState.value != TcpState.TIME_WAIT && tcpStateMachine.tcpState.value != TcpState.CLOSED) {
throw RuntimeException("Failed to close, state: ${tcpStateMachine.tcpState.value}")
}
}
}

override fun toString(): String =
"TcpClient(sourceAddress='$sourceAddress', destinationAddress='$destinationAddress', sourcePort=$sourcePort, destinationPort=$destinationPort, clientId=$clientId)"

override fun getKey(): String = getKey(sourceAddress, sourcePort, destinationAddress, destinationPort, IpType.TCP.value)

override fun read() {
override fun read(): Boolean {
TODO("Not yet implemented")
}

Expand Down

0 comments on commit 65b521f

Please sign in to comment.