From c3d07e1574ddea3e4a2e4fc09041de890c90f831 Mon Sep 17 00:00:00 2001 From: Erwin Kok Date: Thu, 25 Jan 2024 19:13:46 +0100 Subject: [PATCH] Yamux update --- .../erwinkok/libp2p/muxer/yamux/Session.kt | 28 ++++----- .../libp2p/muxer/yamux/YamuxMuxedStream.kt | 19 +++--- .../{MplexStreamId.kt => YamuxStreamId.kt} | 4 +- .../libp2p/muxer/yamux/frame/CloseFrame.kt | 6 +- .../libp2p/muxer/yamux/frame/Frame.kt | 16 ++--- .../libp2p/muxer/yamux/frame/MessageFrame.kt | 6 +- .../muxer/yamux/frame/NewStreamFrame.kt | 4 +- .../libp2p/muxer/yamux/frame/ResetFrame.kt | 6 +- .../muxer/yamux/YamuxMultiplexerTest.kt | 20 +++--- .../muxer/yamux/YamuxMuxedStreamTest.kt | 62 +++++++++---------- 10 files changed, 84 insertions(+), 87 deletions(-) rename libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/{MplexStreamId.kt => YamuxStreamId.kt} (85%) diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/Session.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/Session.kt index 37aa0d7..9aba100 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/Session.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/Session.kt @@ -128,7 +128,7 @@ class Session( private val streamChannel = SafeChannel(16) private val outputChannel = SafeChannel(16) private val mutex = ReentrantLock() - private val streams = mutableMapOf() + private val streams = mutableMapOf() private val nextId = AtomicLong(0) private val isClosing = AtomicBoolean(false) private var closeCause: Error? = null @@ -172,7 +172,7 @@ class Session( _context.complete() } - internal fun removeStream(streamId: MplexStreamId) { + internal fun removeStream(streamId: YamuxStreamId) { mutex.withLock { streams.remove(streamId) if (isClosing.get() && streams.isEmpty()) { @@ -234,9 +234,9 @@ class Session( private suspend fun processFrame(mplexFrame: Frame) { val id = mplexFrame.id val initiator = mplexFrame.initiator - val mplexStreamId = MplexStreamId(!initiator, id) + val yamuxStreamId = YamuxStreamId(!initiator, id) mutex.lock() - val stream: YamuxMuxedStream? = streams[mplexStreamId] + val stream: YamuxMuxedStream? = streams[yamuxStreamId] when (mplexFrame) { is NewStreamFrame -> { if (stream != null) { @@ -244,9 +244,9 @@ class Session( logger.warn { "$this: Remote creates existing new stream: $id. Ignoring." } } else { logger.debug { "$this: Remote creates new stream: $id" } - val name = streamName(mplexFrame.name, mplexStreamId) - val newStream = YamuxMuxedStream(scope, this, outputChannel, mplexStreamId, name, pool) - streams[mplexStreamId] = newStream + val name = streamName(mplexFrame.name, yamuxStreamId) + val newStream = YamuxMuxedStream(scope, this, outputChannel, yamuxStreamId, name, pool) + streams[yamuxStreamId] = newStream mutex.unlock() streamChannel.send(newStream) } @@ -271,21 +271,21 @@ class Session( stream.remoteSendsNewMessage(builder.build()) } if (timeout == null) { - logger.warn { "$this: Reader timeout for stream: $mplexStreamId. Reader is too slow, resetting the stream." } + logger.warn { "$this: Reader timeout for stream: $yamuxStreamId. Reader is too slow, resetting the stream." } stream.reset() } } else { mutex.unlock() - logger.warn { "$this: Remote sends message on non-existing stream: $mplexStreamId" } + logger.warn { "$this: Remote sends message on non-existing stream: $yamuxStreamId" } } } is CloseFrame -> { if (logger.isDebugEnabled) { if (initiator) { - logger.debug("$this: Remote closes his stream: $mplexStreamId") + logger.debug("$this: Remote closes his stream: $yamuxStreamId") } else { - logger.debug("$this: Remote closes our stream: $mplexStreamId") + logger.debug("$this: Remote closes our stream: $yamuxStreamId") } } if (stream != null) { @@ -293,7 +293,7 @@ class Session( stream.remoteClosesWriting() } else { mutex.unlock() - logger.debug { "$this: Remote closes non-existing stream: $mplexStreamId" } + logger.debug { "$this: Remote closes non-existing stream: $yamuxStreamId" } } } @@ -323,7 +323,7 @@ class Session( } mutex.lock() val id = nextId.getAndIncrement() - val streamId = MplexStreamId(true, id) + val streamId = YamuxStreamId(true, id) logger.debug { "$this: We create stream: $id" } val name = streamName(newName, streamId) val muxedStream = YamuxMuxedStream(scope, this, outputChannel, streamId, name, pool) @@ -333,7 +333,7 @@ class Session( return Ok(muxedStream) } - private fun streamName(name: String?, streamId: MplexStreamId): String { + private fun streamName(name: String?, streamId: YamuxStreamId): String { if (name != null) { return name } diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStream.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStream.kt index b1c5f04..4d43bc3 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStream.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStream.kt @@ -56,7 +56,7 @@ class YamuxMuxedStream( private val scope: CoroutineScope, private val session: Session, private val outputChannel: Channel, - private val mplexStreamId: MplexStreamId, + private val yamuxStreamId: YamuxStreamId, override val name: String, override val pool: ObjectPool ) : MuxedStream { @@ -64,9 +64,6 @@ class YamuxMuxedStream( // // memorySpan MemoryManager // -// id uint32 -// session *Session -// // recvWindow uint32 // epochStart time.Time // @@ -87,7 +84,7 @@ class YamuxMuxedStream( private val readerJob: ReaderJob override val id - get() = mplexStreamId.toString() + get() = yamuxStreamId.toString() override val jobContext: Job get() = _context @@ -100,7 +97,7 @@ class YamuxMuxedStream( }.apply { invokeOnCompletion { if (readerJob.isCompleted) { - session.removeStream(mplexStreamId) + session.removeStream(yamuxStreamId) } } } @@ -111,7 +108,7 @@ class YamuxMuxedStream( }.apply { invokeOnCompletion { if (writerJob.isCompleted) { - session.removeStream(mplexStreamId) + session.removeStream(yamuxStreamId) } } } @@ -143,7 +140,7 @@ class YamuxMuxedStream( if (size > 0) { buffer.flip() val packet = buildPacket(pool) { writeFully(buffer) } - val messageFrame = MessageFrame(mplexStreamId, packet) + val messageFrame = MessageFrame(yamuxStreamId, packet) outputChannel.send(messageFrame) } } catch (e: CancellationException) { @@ -160,9 +157,9 @@ class YamuxMuxedStream( } if (!outputChannel.isClosedForSend) { if (channel.closedCause is StreamResetException) { - outputChannel.send(ResetFrame(mplexStreamId)) + outputChannel.send(ResetFrame(yamuxStreamId)) } else { - outputChannel.send(CloseFrame(mplexStreamId)) + outputChannel.send(CloseFrame(yamuxStreamId)) } } } @@ -182,7 +179,7 @@ class YamuxMuxedStream( } override fun toString(): String { - return "mplex-<$mplexStreamId>" + return "mplex-<$yamuxStreamId>" } internal suspend fun remoteSendsNewMessage(packet: ByteReadPacket): Boolean { diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/MplexStreamId.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxStreamId.kt similarity index 85% rename from libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/MplexStreamId.kt rename to libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxStreamId.kt index 4a47773..97166e7 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/MplexStreamId.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxStreamId.kt @@ -1,7 +1,7 @@ // Copyright (c) 2024 Erwin Kok. BSD-3-Clause license. See LICENSE file for more details. package org.erwinkok.libp2p.muxer.yamux -data class MplexStreamId(val initiator: Boolean, val id: Long) { +data class YamuxStreamId(val initiator: Boolean, val id: Long) { override fun toString(): String { return String.format("stream%08x/%s", id, if (initiator) "initiator" else "responder") } @@ -10,7 +10,7 @@ data class MplexStreamId(val initiator: Boolean, val id: Long) { if (other === this) { return true } - if (other !is MplexStreamId) { + if (other !is YamuxStreamId) { return super.equals(other) } return (id == other.id) and diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/CloseFrame.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/CloseFrame.kt index 145f60b..ec2f75a 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/CloseFrame.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/CloseFrame.kt @@ -5,13 +5,13 @@ import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.ByteWriteChannel import org.erwinkok.libp2p.core.network.readUnsignedVarInt import org.erwinkok.libp2p.core.network.writeUnsignedVarInt -import org.erwinkok.libp2p.muxer.yamux.MplexStreamId +import org.erwinkok.libp2p.muxer.yamux.YamuxStreamId import org.erwinkok.result.Error import org.erwinkok.result.Result import org.erwinkok.result.map import org.erwinkok.result.toErrorIf -internal class CloseFrame(streamId: MplexStreamId) : Frame(streamId) { +internal class CloseFrame(streamId: YamuxStreamId) : Frame(streamId) { override val type: Int get() { return if (streamId.initiator) CloseInitiatorTag else CloseReceiverTag @@ -24,7 +24,7 @@ internal class CloseFrame(streamId: MplexStreamId) : Frame(streamId) { } } -internal suspend fun ByteReadChannel.readCloseFrame(streamId: MplexStreamId): Result { +internal suspend fun ByteReadChannel.readCloseFrame(streamId: YamuxStreamId): Result { return readUnsignedVarInt() .toErrorIf({ it != 0uL }, { Error("CloseFrame should not carry data") }) .map { CloseFrame(streamId) } diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/Frame.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/Frame.kt index 02eb2f3..3386a0b 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/Frame.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/Frame.kt @@ -6,12 +6,12 @@ import io.ktor.utils.io.ByteWriteChannel import io.ktor.utils.io.core.Closeable import org.erwinkok.libp2p.core.network.readUnsignedVarInt import org.erwinkok.libp2p.core.network.writeUnsignedVarInt -import org.erwinkok.libp2p.muxer.yamux.MplexStreamId +import org.erwinkok.libp2p.muxer.yamux.YamuxStreamId import org.erwinkok.result.Err import org.erwinkok.result.Result import org.erwinkok.result.getOrElse -sealed class Frame(val streamId: MplexStreamId) : Closeable { +sealed class Frame(val streamId: YamuxStreamId) : Closeable { val initiator: Boolean get() = streamId.initiator val id: Long get() = streamId.id abstract val type: Int @@ -41,12 +41,12 @@ internal suspend fun ByteReadChannel.readMplexFrame(): Result { val id = (header shr 3).toLong() return when (tag) { Frame.NewStreamTag -> readNewStreamFrame(id) - Frame.MessageReceiverTag -> readMessageFrame(MplexStreamId(false, id)) - Frame.MessageInitiatorTag -> readMessageFrame(MplexStreamId(true, id)) - Frame.CloseReceiverTag -> readCloseFrame(MplexStreamId(false, id)) - Frame.CloseInitiatorTag -> readCloseFrame(MplexStreamId(true, id)) - Frame.ResetReceiverTag -> readResetFrame(MplexStreamId(false, id)) - Frame.ResetInitiatorTag -> readResetFrame(MplexStreamId(true, id)) + Frame.MessageReceiverTag -> readMessageFrame(YamuxStreamId(false, id)) + Frame.MessageInitiatorTag -> readMessageFrame(YamuxStreamId(true, id)) + Frame.CloseReceiverTag -> readCloseFrame(YamuxStreamId(false, id)) + Frame.CloseInitiatorTag -> readCloseFrame(YamuxStreamId(true, id)) + Frame.ResetReceiverTag -> readResetFrame(YamuxStreamId(false, id)) + Frame.ResetInitiatorTag -> readResetFrame(YamuxStreamId(true, id)) else -> Err("Unknown Mplex tag type '$tag'") } } diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/MessageFrame.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/MessageFrame.kt index 028537d..4be96f3 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/MessageFrame.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/MessageFrame.kt @@ -6,11 +6,11 @@ import io.ktor.utils.io.ByteWriteChannel import io.ktor.utils.io.core.ByteReadPacket import org.erwinkok.libp2p.core.network.readUnsignedVarInt import org.erwinkok.libp2p.core.network.writeUnsignedVarInt -import org.erwinkok.libp2p.muxer.yamux.MplexStreamId +import org.erwinkok.libp2p.muxer.yamux.YamuxStreamId import org.erwinkok.result.Result import org.erwinkok.result.map -internal class MessageFrame(streamId: MplexStreamId, val packet: ByteReadPacket) : Frame(streamId) { +internal class MessageFrame(streamId: YamuxStreamId, val packet: ByteReadPacket) : Frame(streamId) { override val type: Int get() { return if (streamId.initiator) MessageInitiatorTag else MessageReceiverTag @@ -26,7 +26,7 @@ internal class MessageFrame(streamId: MplexStreamId, val packet: ByteReadPacket) } } -internal suspend fun ByteReadChannel.readMessageFrame(streamId: MplexStreamId): Result { +internal suspend fun ByteReadChannel.readMessageFrame(streamId: YamuxStreamId): Result { return readUnsignedVarInt() .map { length -> readPacket(length.toInt()) } .map { packet -> MessageFrame(streamId, packet) } diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/NewStreamFrame.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/NewStreamFrame.kt index d027921..162c047 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/NewStreamFrame.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/NewStreamFrame.kt @@ -8,11 +8,11 @@ import io.ktor.utils.io.core.toByteArray import io.ktor.utils.io.writeFully import org.erwinkok.libp2p.core.network.readUnsignedVarInt import org.erwinkok.libp2p.core.network.writeUnsignedVarInt -import org.erwinkok.libp2p.muxer.yamux.MplexStreamId +import org.erwinkok.libp2p.muxer.yamux.YamuxStreamId import org.erwinkok.result.Result import org.erwinkok.result.map -internal class NewStreamFrame(id: Long, val name: String) : Frame(MplexStreamId(true, id)) { +internal class NewStreamFrame(id: Long, val name: String) : Frame(YamuxStreamId(true, id)) { override val type: Int get() = NewStreamTag override fun close(): Unit = Unit diff --git a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/ResetFrame.kt b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/ResetFrame.kt index ad61fac..a9e4a30 100644 --- a/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/ResetFrame.kt +++ b/libp2p-muxer-yamux/src/main/kotlin/org/erwinkok/libp2p/muxer/yamux/frame/ResetFrame.kt @@ -5,13 +5,13 @@ import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.ByteWriteChannel import org.erwinkok.libp2p.core.network.readUnsignedVarInt import org.erwinkok.libp2p.core.network.writeUnsignedVarInt -import org.erwinkok.libp2p.muxer.yamux.MplexStreamId +import org.erwinkok.libp2p.muxer.yamux.YamuxStreamId import org.erwinkok.result.Error import org.erwinkok.result.Result import org.erwinkok.result.map import org.erwinkok.result.toErrorIf -internal class ResetFrame(streamId: MplexStreamId) : Frame(streamId) { +internal class ResetFrame(streamId: YamuxStreamId) : Frame(streamId) { override val type: Int get() { return if (streamId.initiator) ResetInitiatorTag else ResetReceiverTag @@ -24,7 +24,7 @@ internal class ResetFrame(streamId: MplexStreamId) : Frame(streamId) { } } -internal suspend fun ByteReadChannel.readResetFrame(streamId: MplexStreamId): Result { +internal suspend fun ByteReadChannel.readResetFrame(streamId: YamuxStreamId): Result { return readUnsignedVarInt() .toErrorIf({ it != 0uL }, { Error("ResetFrame should not carry data") }) .map { ResetFrame(streamId) } diff --git a/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMultiplexerTest.kt b/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMultiplexerTest.kt index 7f7e0b0..f5c1890 100644 --- a/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMultiplexerTest.kt +++ b/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMultiplexerTest.kt @@ -70,7 +70,7 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { repeat(1000) { val muxedStream = mplexMultiplexer.openStream("newStreamName$it").expectNoErrors() assertEquals("newStreamName$it", muxedStream.name) - assertEquals(MplexStreamId(true, it.toLong()).toString(), muxedStream.id) + assertEquals(YamuxStreamId(true, it.toLong()).toString(), muxedStream.id) val actual = connectionPair.remote.input.readMplexFrame().expectNoErrors() assertInstanceOf(NewStreamFrame::class.java, actual) assertTrue(actual.initiator) @@ -94,7 +94,7 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { assertEquals("aName$id", muxedStream.name) assertStreamHasId(false, id, muxedStream) val random1 = Random.nextBytes(1000) - connectionPair.remote.output.writeMplexFrame(MessageFrame(MplexStreamId(true, id), buildPacket(pool) { writeFully(random1) })) + connectionPair.remote.output.writeMplexFrame(MessageFrame(YamuxStreamId(true, id), buildPacket(pool) { writeFully(random1) })) connectionPair.remote.output.flush() assertFalse(muxedStream.input.isClosedForRead) val random2 = ByteArray(random1.size) @@ -137,7 +137,7 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { repeat(1000) { val muxedStream = mplexMultiplexer.openStream("newStreamName$it").expectNoErrors() assertEquals("newStreamName$it", muxedStream.name) - assertEquals(MplexStreamId(true, it.toLong()).toString(), muxedStream.id) + assertEquals(YamuxStreamId(true, it.toLong()).toString(), muxedStream.id) assertNewStreamFrameReceived(it, "newStreamName$it", connectionPair.remote) val random1 = Random.nextBytes(1000) assertFalse(muxedStream.output.isClosedForWrite) @@ -158,10 +158,10 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { repeat(1000) { val muxedStream = mplexMultiplexer.openStream("newStreamName$it").expectNoErrors() assertEquals("newStreamName$it", muxedStream.name) - assertEquals(MplexStreamId(true, it.toLong()).toString(), muxedStream.id) + assertEquals(YamuxStreamId(true, it.toLong()).toString(), muxedStream.id) assertNewStreamFrameReceived(it, "newStreamName$it", connectionPair.remote) val random1 = Random.nextBytes(1000) - connectionPair.remote.output.writeMplexFrame(MessageFrame(MplexStreamId(false, it.toLong()), buildPacket(pool) { writeFully(random1) })) + connectionPair.remote.output.writeMplexFrame(MessageFrame(YamuxStreamId(false, it.toLong()), buildPacket(pool) { writeFully(random1) })) connectionPair.remote.output.flush() assertFalse(muxedStream.input.isClosedForRead) val random2 = ByteArray(random1.size) @@ -186,7 +186,7 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { assertStreamHasId(false, id, muxedStream) assertFalse(muxedStream.input.isClosedForRead) assertFalse(muxedStream.output.isClosedForWrite) - connectionPair.remote.output.writeMplexFrame(CloseFrame(MplexStreamId(true, id))) + connectionPair.remote.output.writeMplexFrame(CloseFrame(YamuxStreamId(true, id))) connectionPair.remote.output.flush() val exception = assertThrows { muxedStream.input.readPacket(10) @@ -233,7 +233,7 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { repeat(1000) { val muxedStream = mplexMultiplexer.openStream("newStreamName$it").expectNoErrors() assertEquals("newStreamName$it", muxedStream.name) - assertEquals(MplexStreamId(true, it.toLong()).toString(), muxedStream.id) + assertEquals(YamuxStreamId(true, it.toLong()).toString(), muxedStream.id) assertNewStreamFrameReceived(it, "newStreamName$it", connectionPair.remote) muxedStream.output.close() yield() @@ -257,11 +257,11 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { repeat(1000) { val muxedStream = mplexMultiplexer.openStream("newStreamName$it").expectNoErrors() assertEquals("newStreamName$it", muxedStream.name) - assertEquals(MplexStreamId(true, it.toLong()).toString(), muxedStream.id) + assertEquals(YamuxStreamId(true, it.toLong()).toString(), muxedStream.id) assertNewStreamFrameReceived(it, "newStreamName$it", connectionPair.remote) assertFalse(muxedStream.input.isClosedForRead) assertFalse(muxedStream.output.isClosedForWrite) - connectionPair.remote.output.writeMplexFrame(CloseFrame(MplexStreamId(false, it.toLong()))) + connectionPair.remote.output.writeMplexFrame(CloseFrame(YamuxStreamId(false, it.toLong()))) connectionPair.remote.output.flush() val exception = assertThrows { muxedStream.input.readPacket(10) @@ -448,7 +448,7 @@ internal class YamuxMultiplexerTest : TestWithLeakCheck { } private fun assertStreamHasId(initiator: Boolean, id: Long, muxedStream: MuxedStream) { - assertEquals(MplexStreamId(initiator, id).toString(), muxedStream.id) + assertEquals(YamuxStreamId(initiator, id).toString(), muxedStream.id) } private suspend fun assertMessageFrameReceived(expected: ByteArray, connection: Connection) { diff --git a/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStreamTest.kt b/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStreamTest.kt index 2892658..abd19be 100644 --- a/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStreamTest.kt +++ b/libp2p-muxer-yamux/src/test/kotlin/org/erwinkok/libp2p/muxer/yamux/YamuxMuxedStreamTest.kt @@ -46,10 +46,10 @@ import kotlin.time.Duration.Companion.seconds internal class YamuxMuxedStreamTest : TestWithLeakCheck { override val pool = VerifyingChunkBufferPool() - private val mplexStreamId = MplexStreamId(true, 1234) - private val mplexStreamName = "AName" + private val yamuxStreamId = YamuxStreamId(true, 1234) + private val yamuxStreamName = "AName" private val session = mockk() - private val streamIdSlot = slot() + private val streamIdSlot = slot() @BeforeEach fun setup() { @@ -59,9 +59,9 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { @Test fun testIdAndName() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) assertEquals("stream000004d2/initiator", muxedStream.id) - assertEquals(mplexStreamName, muxedStream.name) + assertEquals(yamuxStreamName, muxedStream.name) muxedStream.close() muxedStream.awaitClosed() reader.stop() @@ -71,7 +71,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { @Test fun testInitiallyNothingAvailableForRead() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) assertEquals(0, muxedStream.input.availableForRead) muxedStream.close() muxedStream.awaitClosed() @@ -83,7 +83,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { fun testReadPacket() = runTest { repeat(1000) { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) val random = Random.nextBytes(100000) assertTrue(muxedStream.remoteSendsNewMessage(buildPacket(pool) { writeFully(random) })) val bytes = ByteArray(random.size) @@ -100,7 +100,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { fun testReadPacketSplit() = runTest { repeat(1000) { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) val random = Random.nextBytes(50000) assertTrue(muxedStream.remoteSendsNewMessage(buildPacket(pool) { writeFully(random) })) for (j in 0 until 5) { @@ -119,7 +119,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { fun testReadPacketCombined() = runTest { repeat(1000) { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) val random = Random.nextBytes(50000) for (j in 0 until 5) { val bytes = random.copyOfRange(j * 10000, (j + 1) * 10000) @@ -138,7 +138,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { @Test fun testReadPacketWait() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) val result = withTimeoutOrNull(500) { muxedStream.input.readPacket(10) } @@ -152,7 +152,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { @Test fun testReadPacketAfterCancel() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) muxedStream.input.cancel() yield() // Give the input coroutine a chance to cancel val exception1 = assertThrows { @@ -170,7 +170,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { @Test fun testReadPacketAfterClose() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) assertFalse(muxedStream.input.isClosedForRead) assertFalse(muxedStream.output.isClosedForWrite) muxedStream.close() @@ -184,13 +184,13 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { assertEquals("Channel has been cancelled", exception1.message) reader.stop() reader.assertNoBytesReceived() - reader.assertCloseFrameReceived(mplexStreamId) + reader.assertCloseFrameReceived(yamuxStreamId) } @Test fun testReadPacketAfterRemoteCloses() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) assertFalse(muxedStream.input.isClosedForRead) assertFalse(muxedStream.output.isClosedForWrite) muxedStream.remoteClosesWriting() @@ -207,13 +207,13 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { muxedStream.awaitClosed() reader.stop() reader.assertNoBytesReceived() - reader.assertCloseFrameReceived(mplexStreamId) + reader.assertCloseFrameReceived(yamuxStreamId) } @Test fun testReadPacketAfterRemoteClosesDataInBuffer() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) assertFalse(muxedStream.input.isClosedForRead) assertFalse(muxedStream.output.isClosedForWrite) val random = Random.nextBytes(50000) @@ -237,13 +237,13 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { muxedStream.awaitClosed() reader.stop() reader.assertNoBytesReceived() - reader.assertCloseFrameReceived(mplexStreamId) + reader.assertCloseFrameReceived(yamuxStreamId) } @Test fun testNotReading() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) // It seems that the maximum of the input ByteReadChannel is 4088 bytes. So we have to provide enough data // to fill the input channel (~5 * 1000 bytes) and we also have to fill up the inputChannel with 16 packets. // So we have to provide 5 + 16 = 21 packets. @@ -265,7 +265,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { @Test fun testReadPacketAfterReset() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) val random = Random.nextBytes(50000) muxedStream.remoteSendsNewMessage(buildPacket { writeFully(random) }) assertFalse(muxedStream.input.isClosedForRead) @@ -274,7 +274,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { muxedStream.awaitClosed() assertTrue(muxedStream.input.isClosedForRead) assertTrue(muxedStream.output.isClosedForWrite) - reader.assertResetFrameReceived(mplexStreamId) + reader.assertResetFrameReceived(yamuxStreamId) assertStreamRemoved() val exception2 = assertThrows { muxedStream.input.readPacket(random.size) @@ -293,7 +293,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { repeat(1000) { val reader = FrameReader(this, pool) val random = Random.nextBytes(10000) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) muxedStream.output.writeFully(random) muxedStream.output.flush() muxedStream.close() @@ -308,7 +308,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { repeat(1000) { val reader = FrameReader(this, pool) val random = Random.nextBytes(10000) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) muxedStream.output.writeFully(random, 0, 5000) muxedStream.output.writeFully(random, 5000, 5000) muxedStream.output.flush() @@ -322,7 +322,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { @Test fun testWritePacketAfterChannelClose() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) muxedStream.output.close() yield() // Give the input coroutine a chance to cancel val exception1 = assertThrows { @@ -336,13 +336,13 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { muxedStream.awaitClosed() reader.stop() reader.assertNoBytesReceived() - reader.assertCloseFrameReceived(mplexStreamId) + reader.assertCloseFrameReceived(yamuxStreamId) } @Test fun testWritePacketAfterClose() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) assertFalse(muxedStream.input.isClosedForRead) assertFalse(muxedStream.output.isClosedForWrite) muxedStream.close() @@ -357,13 +357,13 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { assertEquals("The channel was closed", exception1.message) reader.stop() reader.assertNoBytesReceived() - reader.assertCloseFrameReceived(mplexStreamId) + reader.assertCloseFrameReceived(yamuxStreamId) } @Test fun testWritePacketAfterReset() = runTest { val reader = FrameReader(this, pool) - val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, mplexStreamId, mplexStreamName, pool) + val muxedStream = YamuxMuxedStream(this, session, reader.frameChannel, yamuxStreamId, yamuxStreamName, pool) val random = Random.nextBytes(50000) muxedStream.remoteSendsNewMessage(buildPacket { writeFully(random) }) assertFalse(muxedStream.input.isClosedForRead) @@ -372,7 +372,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { muxedStream.awaitClosed() assertTrue(muxedStream.input.isClosedForRead) assertTrue(muxedStream.output.isClosedForWrite) - reader.assertResetFrameReceived(mplexStreamId) + reader.assertResetFrameReceived(yamuxStreamId) assertStreamRemoved() val exception2 = assertThrows { muxedStream.output.writeFully(Random.nextBytes(100000)) @@ -385,7 +385,7 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { private fun assertStreamRemoved() { coVerify { session.removeStream(any()) } - assertEquals(mplexStreamId, streamIdSlot.captured) + assertEquals(yamuxStreamId, streamIdSlot.captured) } private fun assertStreamNotRemoved() { @@ -421,12 +421,12 @@ internal class YamuxMuxedStreamTest : TestWithLeakCheck { } } - fun assertResetFrameReceived(streamId: MplexStreamId) { + fun assertResetFrameReceived(streamId: YamuxStreamId) { assertNotNull(resetFrame) assertEquals(streamId, resetFrame?.streamId) } - fun assertCloseFrameReceived(streamId: MplexStreamId) { + fun assertCloseFrameReceived(streamId: YamuxStreamId) { assertNotNull(closeFrame) assertEquals(streamId, closeFrame?.streamId) }