Skip to content

Commit

Permalink
Yamux update
Browse files Browse the repository at this point in the history
  • Loading branch information
erwin-kok committed Jan 25, 2024
1 parent 7c90374 commit df88a98
Show file tree
Hide file tree
Showing 15 changed files with 366 additions and 1,486 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import org.erwinkok.libp2p.core.resourcemanager.PeerScope
import org.erwinkok.result.Result

interface StreamMuxerTransport {
suspend fun newConnection(connection: Connection, initiator: Boolean, scope: PeerScope): Result<StreamMuxerConnection>
suspend fun newConnection(connection: Connection, initiator: Boolean, scope: PeerScope?): Result<StreamMuxerConnection>
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.erwinkok.result.Result
class MplexStreamMuxerTransport private constructor(
private val coroutineScope: CoroutineScope,
) : StreamMuxerTransport {
override suspend fun newConnection(connection: Connection, initiator: Boolean, scope: PeerScope): Result<StreamMuxerConnection> {
override suspend fun newConnection(connection: Connection, initiator: Boolean, scope: PeerScope?): Result<StreamMuxerConnection> {
return Ok(MplexStreamMuxerConnection(coroutineScope, connection, initiator))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2024 Erwin Kok. BSD-3-Clause license. See LICENSE file for more details.
package org.erwinkok.libp2p.muxer.yamux

import org.erwinkok.result.Ok
import org.erwinkok.result.Result

interface MemoryManager {
fun reserveMemory(size: Int, prio: UByte): Result<Unit>
fun releaseMemory(size: Int)
fun done()

companion object {
val NullMemoryManager = object : MemoryManager {
override fun reserveMemory(size: Int, prio: UByte): Result<Unit> {
return Ok(Unit)
}

override fun releaseMemory(size: Int) {
}

override fun done() {
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2024 Erwin Kok. BSD-3-Clause license. See LICENSE file for more details.
package org.erwinkok.libp2p.muxer.yamux

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import mu.KotlinLogging
import org.erwinkok.libp2p.core.base.AwaitableClosable
import org.erwinkok.libp2p.core.network.Connection
import org.erwinkok.result.Ok
import org.erwinkok.result.Result
import java.util.concurrent.atomic.AtomicLong

private val logger = KotlinLogging.logger {}

class Session(
private val scope: CoroutineScope,
private val config: YamuxConfig,
private val connection: Connection,
private val client: Boolean,
private val memoryManager: (() -> Result<MemoryManager>)? = { Ok(MemoryManager.NullMemoryManager) }
) : AwaitableClosable {
private val _context = Job(scope.coroutineContext[Job])

override val jobContext: Job get() = _context

// rtt int64 // to be accessed atomically, in nanoseconds

// remoteGoAway indicates the remote side does
// not want futher connections. Must be first for alignment.
// remoteGoAway int32

// localGoAway indicates that we should stop
// accepting futher connections. Must be first for alignment.
// localGoAway int32

// nextStreamID is the next stream we should send. This depends if we are a client/server.
private val nextStreamID = AtomicLong(0)

// pings is used to track inflight pings
// pingLock sync.Mutex
// pingID uint32
// activePing *ping

// streams maps a stream id to a stream, and inflight has an entry
// for any outgoing stream that has not yet been established. Both are
// protected by streamLock.
// numIncomingStreams uint32
// streams map[uint32]*Stream
// inflight map[uint32]struct{}
// streamLock sync.Mutex

// synCh acts like a semaphore. It is sized to the AcceptBacklog which
// is assumed to be symmetric between the client and server. This allows
// the client to avoid exceeding the backlog and instead blocks the open.
// synCh chan struct{}

// acceptCh is used to pass ready streams to the client
// acceptCh chan *Stream

// sendCh is used to send messages
// sendCh chan []byte

// pingCh and pingCh are used to send pings and pongs
// pongCh, pingCh chan uint32

// recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown
// recvDoneCh chan struct{}

// sendDoneCh is closed when send() exits to avoid a race
// between returning from a Stream.Write and exiting from the send loop
// (which may be reading a buffer on-load-from Stream.Write).
// sendDoneCh chan struct{}

// shutdown is used to safely close a session
// shutdown bool
// shutdownErr error
// shutdownCh chan struct{}
// shutdownLock sync.Mutex

// keepaliveTimer is a periodic timer for keepalive messages. It's nil
// when keepalives are disabled.
// keepaliveLock sync.Mutex
// keepaliveTimer *time.Timer
// keepaliveActive bool

init {
if (client) {
nextStreamID.set(1)
} else {
nextStreamID.set(2)
}
if (config.enableKeepAlive) {

}
}

override fun close() {

}

fun openStream(name: String?): Result<YamuxMuxedStream> {
TODO("Not yet implemented")
}

fun acceptStream(): Result<YamuxMuxedStream> {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024 Erwin Kok. BSD-3-Clause license. See LICENSE file for more details.
package org.erwinkok.libp2p.muxer.yamux

import org.erwinkok.libp2p.muxer.yamux.YamuxConst.initialStreamWindow
import org.erwinkok.libp2p.muxer.yamux.YamuxConst.maxStreamWindow
import kotlin.time.Duration
import kotlin.time.Duration.Companion.microseconds
import kotlin.time.Duration.Companion.seconds

data class YamuxConfig(
// AcceptBacklog is used to limit how many streams may be
// waiting an accept.
val acceptBacklog: Int = 256,

// PingBacklog is used to limit how many ping acks we can queue.
val pingBacklog: Int = 32,

// EnableKeepalive is used to do a period keep alive
// messages using a ping.
val enableKeepAlive: Boolean = true,

// KeepAliveInterval is how often to perform the keep alive
val keepAliveInterval: Duration = 30.seconds,

// MeasureRTTInterval is how often to re-measure the round trip time
val measureRTTInterval: Duration = 30.seconds,

// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
// we which will suspect a problem with the underlying connection and
// close it. This is only applied to writes, where's there's generally
// an expectation that things will move along quickly.
val connectionWriteTimeout: Duration = 10.seconds,

// MaxIncomingStreams is maximum number of concurrent incoming streams
// that we accept. If the peer tries to open more streams, those will be
// reset immediately.
val maxIncomingStreams: UInt = 1000u,

// InitialStreamWindowSize is used to control the initial
// window size that we allow for a stream.
val initialStreamWindowSize: UInt = initialStreamWindow,

// MaxStreamWindowSize is used to control the maximum
// window size that we allow for a stream.
val maxStreamWindowSize: UInt = maxStreamWindow,

// WriteCoalesceDelay is the maximum amount of time we'll delay
// coalescing a packet before sending it. This should be on the order of
// micro-milliseconds.
val writeCoalesceDelay: Duration = 100.microseconds,

// MaxMessageSize is the maximum size of a message that we'll send on a
// stream. This ensures that a single stream doesn't hog a connection.
val maxMessageSize: UInt = 64u * 1024u,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) 2024 Erwin Kok. BSD-3-Clause license. See LICENSE file for more details.
package org.erwinkok.libp2p.muxer.yamux

object YamuxConst {
// initialStreamWindow is the initial stream window size.
// It's not an implementation choice, the value defined in the specification.
val initialStreamWindow: UInt = 256u * 1024u
val maxStreamWindow: UInt = 16u * 1024u * 1024u
}
Loading

0 comments on commit df88a98

Please sign in to comment.