Skip to content

[BUG] LEAK: ByteBuf.release() was not called before it's garbage-collected #4539

Closed
@akozich

Description

@akozich

Tapir version: 1.11.15

Scala version: 3.5.2

NettyFutureServer is used. We expose a public POST endpoint accepting a payload from the mobile clients. Due to the nature of mobile clients, sometimes the connection is closed in the middle of the request body. We observe the following error from time to time to time.

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	Hint: 'HttpStreamsServerHandler#0-body-publisher' will handle the message from this point.
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	org.playframework.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:194)
	org.playframework.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
	org.playframework.netty.http.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:96)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	io.opentelemetry.javaagent.shaded.instrumentation.netty.v4_1.internal.server.HttpServerRequestTracingHandler.channelRead(HttpServerRequestTracingHandler.java:44)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:793)
	io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.handle(AbstractEpollChannel.java:471)
	io.netty.channel.epoll.EpollIoHandler$DefaultEpollIoRegistration.handle(EpollIoHandler.java:307)
	io.netty.channel.epoll.EpollIoHandler.processReady(EpollIoHandler.java:489)
	io.netty.channel.epoll.EpollIoHandler.run(EpollIoHandler.java:444)
	io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:204)
	io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:175)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1073)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Unknown Source)

What is the problem?

SimpleSubscriber checks the received bytebuffer size. If the body is fully received, it releases the bytebuffer. Otherwise, it stores a link to the bytebuffer to be merged with the upcoming chunks. Probbaly, the Future returned from the SimpleSubscriber is garbage collected before receiving the close signal, and in this case byte buffer is collected without release.

How to reproduce?

Maybe you can provide code to reproduce the problem?

here's the server

import io.circe.derivation.Configuration
import io.circe.derivation.ConfiguredCodec
import sttp.model.StatusCode
import sttp.tapir.*
import sttp.tapir.Schema
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.server.netty.NettyFutureServer
import sttp.tapir.server.netty.NettyFutureServerOptions

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

given Configuration = Configuration.default.withSnakeCaseMemberNames.withDefaults

case class PermissionsInput(
  permissions: Seq[PermissionInput]
) derives ConfiguredCodec, Schema

case class PermissionInput(
  platform: String,
  id: String,
  accessType: String,
  state: String
) derives ConfiguredCodec, Schema

object Server {

  private val permissionsEndpoint =
    endpoint.put
      .securityIn(header[Option[String]]("X-Token-1"))
      .in("permissions")
      .in(jsonBody[PermissionsInput])
      .out(statusCode)

  private val permissionsServerEndpoint = permissionsEndpoint
    .serverSecurityLogic {
      case Some(value) => Future.successful(Right(value))
      case None        => Future.successful(Left(StatusCode.Unauthorized))
    }
    .serverLogicSuccess { (token: String) => (permissionsInput: PermissionsInput) =>
      Thread.sleep(100)
      println(permissionsInput.permissions.size)
      Future.successful(StatusCode.Created)
    }

  private val server = NettyFutureServer()
    .host("0.0.0.0")
    .options(
      NettyFutureServerOptions.customiseInterceptors.serverLog(
        NettyFutureServerOptions.defaultServerLog.logAllDecodeFailures(true).logWhenHandled(true)
      ).options
    )
    .port(8080)
    .addEndpoints(permissionsServerEndpoint :: Nil)

  def main(args: Array[String]): Unit = {
    server.start().andThen {
      case scala.util.Success(_) =>
        println("Server started on port 8080")
      case scala.util.Failure(exception) =>
        println(s"Failed to start server: ${exception.getMessage}")
    }
  }

}

and here's the client sending truncated requests

import java.io.PrintWriter
import java.net.Socket
import java.util.concurrent.Executors

object TruncatedRequestsSender extends App {
  private val pool = Executors.newFixedThreadPool(4)

  private val client: Runnable = { () =>
    var i = 0

    while (true) {
      println(s"${Thread.currentThread().getName}:$i")

      val socket = new Socket("localhost", 8080)
      val out = new PrintWriter(socket.getOutputStream, true)

      out.print(
        """PUT /permissions HTTP/1.1
          |Host: example.com
          |Content-Type: application/json
          |Content-Length: 500
          |
          |{
          |  "permissions": [
          |    {
          |      "id": "sleep",""".stripMargin
      )

      out.close()
      socket.close()

      Thread.sleep(10)

      i += 1
    }
  }
  pool.submit(client)
  pool.submit(client)
  pool.submit(client)
  pool.submit(client)
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions