Skip to content
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.tls.TLSContext#Builder.fs2$io$net$tls$TLSContextCompanionPlatform$BuilderPlatform$$$outer"
)
),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.metrics")
)

lazy val root = tlCrossRootProject
Expand Down
12 changes: 11 additions & 1 deletion io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ private[fs2] trait ioplatform {
def writeWritable[F[_]](
writable: F[Writable],
endAfterUse: Boolean = true
)(implicit F: Async[F]): Pipe[F, Byte, Nothing] =
writeWritableInstrumented(writable, endAfterUse, _ => ())

private[io] def writeWritableInstrumented[F[_]](
writable: F[Writable],
endAfterUse: Boolean = true,
onWrite: Chunk[Byte] => Unit
)(implicit F: Async[F]): Pipe[F, Byte, Nothing] =
in =>
Stream
Expand All @@ -201,7 +208,10 @@ private[fs2] trait ioplatform {
F.delay {
writable.write(
chunk.toUint8Array,
e => cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException))
e => {
onWrite(chunk)
cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException))
}
)
Some(F.delay(writable.destroy()))
}
Expand Down
25 changes: 21 additions & 4 deletions io/js/src/main/scala/fs2/io/net/SocketPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import cats.data.{Kleisli, OptionT}
import cats.effect.{Async, Resource}
import com.comcast.ip4s.{GenSocketAddress, IpAddress, SocketAddress}
import fs2.io.internal.{facade, SuspendedStream}
import java.util.concurrent.atomic.AtomicLong

private[net] trait SocketCompanionPlatform {

Expand Down Expand Up @@ -55,15 +56,27 @@ private[net] trait SocketCompanionPlatform {
val address: GenSocketAddress,
val peerAddress: GenSocketAddress
)(implicit F: Async[F])
extends Socket[F] {
extends Socket[F] { outer =>

private val totalBytesRead: AtomicLong = new AtomicLong(0L)
private val totalBytesWritten: AtomicLong = new AtomicLong(0L)
private val incompleteWriteCount: AtomicLong = new AtomicLong(0L)

def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics {
def totalBytesRead(): Long = outer.totalBytesRead.get
def totalBytesWritten(): Long = outer.totalBytesWritten.get
def incompleteWriteCount(): Long = outer.incompleteWriteCount.get
}

private def read(
f: Stream[F, Byte] => Pull[F, Chunk[Byte], Option[(Chunk[Byte], Stream[F, Byte])]]
): F[Option[Chunk[Byte]]] =
readStream
.getAndUpdate(Kleisli(f).flatMapF {
case Some((chunk, tail)) => Pull.output1(chunk).as(tail)
case None => Pull.pure(Stream.empty)
case Some((chunk, tail)) =>
totalBytesRead.addAndGet(chunk.size.toLong)
Pull.output1(chunk).as(tail)
case None => Pull.pure(Stream.empty)
}.run)
.compile
.last
Expand Down Expand Up @@ -113,7 +126,11 @@ private[net] trait SocketCompanionPlatform {
Stream.chunk(bytes).through(writes).compile.drain

override def writes: Pipe[F, Byte, Nothing] =
writeWritable(F.pure(sock), endAfterUse = false)
writeWritableInstrumented(
F.pure(sock),
endAfterUse = false,
c => { totalBytesWritten.addAndGet(c.size.toLong); () }
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type =>
override def getOption[A](key: SocketOption.Key[A]) = underlying.getOption(key)
override def setOption[A](key: SocketOption.Key[A], value: A) = underlying.setOption(key, value)
override def supportedOptions = underlying.supportedOptions
override def metrics = underlying.metrics
}
}
31 changes: 26 additions & 5 deletions io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import cats.syntax.all._

import java.nio.channels.{AsynchronousSocketChannel, CompletionHandler}
import java.nio.{Buffer, ByteBuffer}
import java.util.concurrent.atomic.AtomicLong

private[net] trait SocketCompanionPlatform {
private[net] def forAsync[F[_]: Async](
Expand All @@ -50,6 +51,7 @@ private[net] trait SocketCompanionPlatform {
extends Socket[F] {
private[this] final val defaultReadSize = 8192
private[this] var readBuffer: ByteBuffer = ByteBuffer.allocate(defaultReadSize)
protected val totalBytesRead: AtomicLong = new AtomicLong(0L)

private def withReadBuffer[A](size: Int)(f: ByteBuffer => F[A]): F[A] =
readMutex.lock.surround {
Expand All @@ -65,6 +67,14 @@ private[net] trait SocketCompanionPlatform {
/** Performs a single channel read operation in to the supplied buffer. */
protected def readChunk(buffer: ByteBuffer): F[Int]

/** Instrumented version of `readChunk`. */
protected def readChunk0(buffer: ByteBuffer): F[Int] =
readChunk(buffer).map { bytesRead =>
if (bytesRead >= 0)
totalBytesRead.addAndGet(bytesRead.toLong): Unit
bytesRead
}

/** Copies the contents of the supplied buffer to a `Chunk[Byte]` and clears the buffer contents. */
private def releaseBuffer(buffer: ByteBuffer): F[Chunk[Byte]] =
F.delay {
Expand All @@ -83,7 +93,7 @@ private[net] trait SocketCompanionPlatform {

def read(max: Int): F[Option[Chunk[Byte]]] =
withReadBuffer(max) { buffer =>
readChunk(buffer).flatMap { read =>
readChunk0(buffer).flatMap { read =>
if (read < 0) F.pure(None)
else releaseBuffer(buffer).map(Some(_))
}
Expand All @@ -92,7 +102,7 @@ private[net] trait SocketCompanionPlatform {
def readN(max: Int): F[Chunk[Byte]] =
withReadBuffer(max) { buffer =>
def go: F[Chunk[Byte]] =
readChunk(buffer).flatMap { readBytes =>
readChunk0(buffer).flatMap { readBytes =>
if (readBytes < 0 || buffer.position() >= max)
releaseBuffer(buffer)
else go
Expand All @@ -114,11 +124,20 @@ private[net] trait SocketCompanionPlatform {
val peerAddress: GenSocketAddress
)(implicit F: Async[F])
extends BufferedReads[F](readMutex)
with SocketInfo.AsyncSocketInfo[F] {
with SocketInfo.AsyncSocketInfo[F] { outer =>

protected def asyncInstance = F
protected def channel = ch

private val totalBytesWritten: AtomicLong = new AtomicLong(0L)
private val incompleteWriteCount: AtomicLong = new AtomicLong(0L)

def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics {
def totalBytesRead(): Long = outer.totalBytesRead.get
def totalBytesWritten(): Long = outer.totalBytesWritten.get
def incompleteWriteCount(): Long = outer.incompleteWriteCount.get
}

protected def readChunk(buffer: ByteBuffer): F[Int] =
F.async[Int] { cb =>
ch.read(
Expand All @@ -139,9 +158,11 @@ private[net] trait SocketCompanionPlatform {
)
F.delay(Some(endOfOutput.voidError))
}.flatMap { written =>
if (written >= 0 && buff.remaining() > 0)
totalBytesWritten.addAndGet(written.toLong)
if (written >= 0 && buff.remaining() > 0) {
incompleteWriteCount.incrementAndGet()
go(buff)
else F.unit
} else F.unit
}
writeMutex.lock.surround {
F.delay(bytes.toByteBuffer).flatMap(go)
Expand Down
21 changes: 18 additions & 3 deletions io/jvm/src/main/scala/fs2/io/net/AsyncUnixSocketsProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import fs2.io.file.{Files, FileHandle, SyncFileHandle}

import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
import java.util.concurrent.atomic.AtomicLong

private[net] abstract class AsyncUnixSocketsProvider[F[_]: Files](implicit F: Async[F])
extends UnixSocketsProvider[F] {
Expand Down Expand Up @@ -98,19 +99,33 @@ private[net] object AsyncUnixSocketsProvider {
val peerAddress: UnixSocketAddress
)(implicit F: Async[F])
extends Socket.BufferedReads[F](readMutex)
with SocketInfo.OptionsSupport[F] {
with SocketInfo.OptionsSupport[F] { outer =>

protected def asyncInstance = F
protected def channel = ch

private val totalBytesWritten: AtomicLong = new AtomicLong(0L)
private val incompleteWriteCount: AtomicLong = new AtomicLong(0L)

def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics {
def totalBytesRead(): Long = outer.totalBytesRead.get
def totalBytesWritten(): Long = outer.totalBytesWritten.get
def incompleteWriteCount(): Long = outer.incompleteWriteCount.get
}

def readChunk(buff: ByteBuffer): F[Int] =
evalOnVirtualThreadIfAvailable(F.blocking(ch.read(buff)))
.cancelable(close)

def write(bytes: Chunk[Byte]): F[Unit] = {
def go(buff: ByteBuffer): F[Unit] =
F.blocking(ch.write(buff)).cancelable(close) *>
F.delay(buff.remaining <= 0).ifM(F.unit, go(buff))
F.blocking(ch.write(buff)).cancelable(close).flatMap { written =>
totalBytesWritten.addAndGet(written.toLong)
if (written >= 0 && buff.remaining() > 0) {
incompleteWriteCount.incrementAndGet()
go(buff)
} else F.unit
}

writeMutex.lock.surround {
F.delay(bytes.toByteBuffer).flatMap(buffer => evalOnVirtualThreadIfAvailable(go(buffer)))
Expand Down
16 changes: 14 additions & 2 deletions io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import java.nio.ByteBuffer
import java.nio.channels.SelectionKey.OP_READ
import java.nio.channels.SelectionKey.OP_WRITE
import java.nio.channels.SocketChannel
import java.util.concurrent.atomic.AtomicLong

private final class SelectingSocket[F[_]: LiftIO] private (
selector: Selector,
Expand All @@ -45,11 +46,20 @@ private final class SelectingSocket[F[_]: LiftIO] private (
val peerAddress: SocketAddress[IpAddress]
)(implicit F: Async[F])
extends Socket.BufferedReads(readMutex)
with SocketInfo.AsyncSocketInfo[F] {
with SocketInfo.AsyncSocketInfo[F] { outer =>

protected def asyncInstance = F
protected def channel = ch

private val totalBytesWritten: AtomicLong = new AtomicLong(0L)
private val incompleteWriteCount: AtomicLong = new AtomicLong(0L)

def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics {
def totalBytesRead(): Long = outer.totalBytesRead.get
def totalBytesWritten(): Long = outer.totalBytesWritten.get
def incompleteWriteCount(): Long = outer.incompleteWriteCount.get
}

override def localAddress: F[SocketAddress[IpAddress]] =
asyncInstance.pure(address)

Expand All @@ -65,10 +75,12 @@ private final class SelectingSocket[F[_]: LiftIO] private (
def write(bytes: Chunk[Byte]): F[Unit] = {
def go(buf: ByteBuffer): F[Unit] =
F.delay {
ch.write(buf)
val written = ch.write(buf)
totalBytesWritten.addAndGet(written.toLong)
buf.remaining()
}.flatMap { remaining =>
if (remaining > 0) {
incompleteWriteCount.incrementAndGet()
selector.select(ch, OP_WRITE).to *> go(buf)
} else F.unit
}
Expand Down
3 changes: 3 additions & 0 deletions io/jvm/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type =>
def applicationProtocol: F[String] =
engine.applicationProtocol

def metrics: SocketMetrics =
socket.metrics

@deprecated("3.13.0", "No replacement; sockets are open until they are finalized")
def isOpen: F[Boolean] = socket.isOpen
}
Expand Down
1 change: 1 addition & 0 deletions io/jvm/src/test/scala/fs2/io/net/tls/TLSSocketSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ class TLSSocketSuite extends TLSSuite {
@deprecated("", "")
def remoteAddress = raw.remoteAddress
def writes = raw.writes
def metrics = raw.metrics

def address = raw.address
def getOption[A](key: SocketOption.Key[A]) = raw.getOption(key)
Expand Down
26 changes: 21 additions & 5 deletions io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.scalanative.posix.unistd
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._

import java.util.concurrent.atomic.AtomicLong

import FdPollingSocket._

private final class FdPollingSocket[F[_]: LiftIO] private (
Expand All @@ -45,7 +47,17 @@ private final class FdPollingSocket[F[_]: LiftIO] private (
val address: GenSocketAddress,
val peerAddress: GenSocketAddress
)(implicit F: Async[F])
extends Socket[F] {
extends Socket[F] { outer =>

private val totalBytesRead: AtomicLong = new AtomicLong(0L)
private val totalBytesWritten: AtomicLong = new AtomicLong(0L)
private val incompleteWriteCount: AtomicLong = new AtomicLong(0L)

def metrics: SocketMetrics = new SocketMetrics.UnsealedSocketMetrics {
def totalBytesRead() = outer.totalBytesRead.get
def totalBytesWritten() = outer.totalBytesWritten.get
def incompleteWriteCount() = outer.incompleteWriteCount.get
}

def localAddress = F.pure(address.asIpUnsafe)
def remoteAddress = F.pure(peerAddress.asIpUnsafe)
Expand All @@ -60,9 +72,10 @@ private final class FdPollingSocket[F[_]: LiftIO] private (
handle
.pollReadRec(()) { _ =>
IO(guard(unistd.read(fd, buf, maxBytes.toUSize))).flatMap { readed =>
if (readed > 0)
if (readed > 0) {
totalBytesRead.addAndGet(readed.toLong)
IO(Right(Some(Chunk.fromBytePtr(buf, readed))))
else if (readed == 0)
} else if (readed == 0)
IO.pure(Right(None))
else
IO.pure(Left(()))
Expand All @@ -76,6 +89,7 @@ private final class FdPollingSocket[F[_]: LiftIO] private (
def go(pos: Int): IO[Either[Int, Chunk[Byte]]] =
IO(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toUSize))).flatMap { readed =>
if (readed > 0) {
totalBytesRead.addAndGet(readed.toLong)
val newPos = pos + readed
if (newPos < numBytes) go(newPos)
else IO(Right(Chunk.fromBytePtr(buf, newPos)))
Expand Down Expand Up @@ -103,10 +117,12 @@ private final class FdPollingSocket[F[_]: LiftIO] private (
guard(unistd.write(fd, buf.atUnsafe(offset + pos), (length - pos).toUSize))
}.flatMap { wrote =>
if (wrote >= 0) {
totalBytesWritten.addAndGet(wrote.toLong)
val newPos = pos + wrote
if (newPos < length)
if (newPos < length) {
incompleteWriteCount.incrementAndGet()
go(newPos)
else
} else
IO.pure(Either.unit)
} else
IO.pure(Left(pos))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type =>

def applicationProtocol: F[String] = connection.applicationProtocol

def metrics: SocketMetrics = socket.metrics

@deprecated("3.13.0", "No replacement; sockets are open until they are finalized")
def isOpen: F[Boolean] = socket.isOpen
}
Expand Down
2 changes: 2 additions & 0 deletions io/shared/src/main/scala/fs2/io/net/Socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ trait Socket[F[_]] extends SocketInfo[F] {
go(offset, count).through(writes)
}

def metrics: SocketMetrics

// Deprecated members

@deprecated("3.13.0", "No replacement; sockets are open until they are finalized")
Expand Down
Loading