From 0833d7642e0f57659ee65c3faba10809d0a06d20 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Jul 2023 15:48:10 +0200 Subject: [PATCH 01/16] Avoid to run Beats parser and Beats protocol handler in separate executors group (beatsHandlerExecutorGroup) --- src/main/java/org/logstash/beats/Server.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index c343aaf6..5c4e0d25 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -112,7 +112,6 @@ private class BeatsInitializer extends ChannelInitializer { private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5; private final EventExecutorGroup idleExecutorGroup; - private final EventExecutorGroup beatsHandlerExecutorGroup; private final IMessageListener localMessageListener; private final int localClientInactivityTimeoutSeconds; @@ -121,7 +120,6 @@ private class BeatsInitializer extends ChannelInitializer { this.localMessageListener = messageListener; this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); - beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); } public void initChannel(SocketChannel socket){ @@ -134,7 +132,8 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); - pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener)); + pipeline.addLast(new BeatsParser()); + pipeline.addLast(new BeatsHandler(localMessageListener)); } @@ -152,7 +151,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E public void shutdownEventExecutor() { try { idleExecutorGroup.shutdownGracefully().sync(); - beatsHandlerExecutorGroup.shutdownGracefully().sync(); } catch (InterruptedException e) { throw new IllegalStateException(e); } From 79046db3bdb47794116417c2356daa27c651473c Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Jul 2023 15:59:57 +0200 Subject: [PATCH 02/16] Ported core part of original #410 PR --- .../java/org/logstash/beats/BeatsParser.java | 84 +++++++++++++++++-- 1 file changed, 79 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 61337d3b..5e03663f 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -3,8 +3,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.DecoderException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -14,12 +16,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.zip.Inflater; import java.util.zip.InflaterOutputStream; public class BeatsParser extends ByteToMessageDecoder { private final static Logger logger = LogManager.getLogger(BeatsParser.class); + private final static long maxDirectMemory = io.netty.util.internal.PlatformDependent.maxDirectMemory(); private Batch batch; @@ -45,15 +49,18 @@ private enum States { private int requiredBytes = 0; private int sequence = 0; private boolean decodingCompressedBuffer = false; + private long usedDirectMemory; + private boolean closeCalled = false; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws InvalidFrameProtocolException, IOException { - if(!hasEnoughBytes(in)) { - if (decodingCompressedBuffer){ + if (!hasEnoughBytes(in)) { + if (decodingCompressedBuffer) { throw new InvalidFrameProtocolException("Insufficient bytes in compressed content to decode: " + currentState); } return; } + usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); switch (currentState) { case READ_HEADER: { @@ -178,6 +185,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t case READ_COMPRESSED_FRAME: { logger.trace("Running: READ_COMPRESSED_FRAME"); + + if (usedDirectMemory + requiredBytes > maxDirectMemory * 0.90) { + ctx.channel().config().setAutoRead(false); + ctx.close(); + closeCalled = true; + throw new IOException("not enough memory to decompress this from " + ctx.channel().id()); + } inflateCompressedFrame(ctx, in, (buffer) -> { transition(States.READ_HEADER); @@ -188,6 +202,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } } finally { decodingCompressedBuffer = false; + ctx.channel().config().setAutoRead(false); + ctx.channel().eventLoop().schedule(() -> { + ctx.channel().config().setAutoRead(true); + }, 5, TimeUnit.MILLISECONDS); transition(States.READ_HEADER); } }); @@ -195,9 +213,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } case READ_JSON: { logger.trace("Running: READ_JSON"); - ((V2Batch)batch).addMessage(sequence, in, requiredBytes); - if(batch.isComplete()) { - if(logger.isTraceEnabled()) { + ((V2Batch) batch).addMessage(sequence, in, requiredBytes); + if (batch.isComplete()) { + if (logger.isTraceEnabled()) { logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); } out.add(batch); @@ -256,6 +274,62 @@ private void batchComplete() { batch = null; } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //System.out.println("channelRead(" + ctx.channel().isActive() + ": " + ctx.channel().id() + ":" + currentState + ":" + decodingCompressedBuffer); + if (closeCalled) { + ((ByteBuf) msg).release(); + //if(batch != null) batch.release(); + return; + } + usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); + + // If we're just beginning a new frame on this channel, + // don't accumulate more data for 25 ms if usage of direct memory is above 20% + // + // The goal here is to avoid thundering herd: many beats connecting and sending data + // at the same time. As some channels progress to other states they'll use more memory + // but also give it back once a full batch is read. + if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) { + if (usedDirectMemory > (maxDirectMemory * 0.40)) { + ctx.channel().config().setAutoRead(false); + //System.out.println("pausing reads on " + ctx.channel().id()); + ctx.channel().eventLoop().schedule(() -> { + //System.out.println("resuming reads on " + ctx.channel().id()); + ctx.channel().config().setAutoRead(true); + }, 200, TimeUnit.MILLISECONDS); + } else { + //System.out.println("no need to pause reads on " + ctx.channel().id()); + } + } else if (usedDirectMemory > maxDirectMemory * 0.90) { + ctx.channel().config().setAutoRead(false); + ctx.close(); + closeCalled = true; + ((ByteBuf) msg).release(); + if (batch != null) batch.release(); + throw new IOException("about to explode, cut them all down " + ctx.channel().id()); + } + super.channelRead(ctx, msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); + if (cause instanceof DecoderException) { + ctx.channel().config().setAutoRead(false); + if (!closeCalled) ctx.close(); + } else if (cause instanceof OutOfMemoryError) { + cause.printStackTrace(); + ctx.channel().config().setAutoRead(false); + if (!closeCalled) ctx.close(); + } else if (cause instanceof IOException) { + ctx.channel().config().setAutoRead(false); + if (!closeCalled) ctx.close(); + } else { + super.exceptionCaught(ctx, cause); + } + } + @FunctionalInterface private interface CheckedConsumer { void accept(T t) throws IOException; From 59f1d3b1dfdca77266768d901292c1ce93cfd45e Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 7 Jul 2023 14:36:51 +0200 Subject: [PATCH 03/16] Added pull way of fetching data --- .../java/org/logstash/beats/BeatsParser.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 5e03663f..4a0530fd 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -52,6 +52,8 @@ private enum States { private long usedDirectMemory; private boolean closeCalled = false; + private boolean stopAutoreadRequested = false; + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws InvalidFrameProtocolException, IOException { if (!hasEnoughBytes(in)) { @@ -292,17 +294,25 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // but also give it back once a full batch is read. if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) { if (usedDirectMemory > (maxDirectMemory * 0.40)) { - ctx.channel().config().setAutoRead(false); - //System.out.println("pausing reads on " + ctx.channel().id()); - ctx.channel().eventLoop().schedule(() -> { - //System.out.println("resuming reads on " + ctx.channel().id()); - ctx.channel().config().setAutoRead(true); - }, 200, TimeUnit.MILLISECONDS); + if (!stopAutoreadRequested) { + ctx.channel().config().setAutoRead(false); + stopAutoreadRequested = true; + logger.info("Set channel {} to autoread FALSE (> 40%)", ctx.channel()); + //System.out.println("pausing reads on " + ctx.channel().id()); + ctx.channel().eventLoop().schedule(() -> { + //System.out.println("resuming reads on " + ctx.channel().id()); + ctx.channel().config().setAutoRead(true); + stopAutoreadRequested = false; + logger.info("Set channel {} to autoread TRUE after 200 ms", ctx.channel()); + }, 200, TimeUnit.MILLISECONDS); + } } else { //System.out.println("no need to pause reads on " + ctx.channel().id()); } } else if (usedDirectMemory > maxDirectMemory * 0.90) { ctx.channel().config().setAutoRead(false); + stopAutoreadRequested = true; + logger.info("Set channel {} to autoread FALSE (> 90%)", ctx.channel()); ctx.close(); closeCalled = true; ((ByteBuf) msg).release(); @@ -312,6 +322,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception super.channelRead(ctx, msg); } + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); From 09abc1a05b91c612c0ccf747f89ae2fcced67ceb Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 12 Jul 2023 11:48:28 +0200 Subject: [PATCH 04/16] Extracted the pull mode part into a separate handler --- .../java/org/logstash/beats/BeatsParser.java | 16 --------- .../logstash/beats/FlowLimiterHandler.java | 35 +++++++++++++++++++ src/main/java/org/logstash/beats/Server.java | 1 + 3 files changed, 36 insertions(+), 16 deletions(-) create mode 100644 src/main/java/org/logstash/beats/FlowLimiterHandler.java diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 4a0530fd..d79d0728 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -322,22 +322,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception super.channelRead(ctx, msg); } - @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - if (!ctx.channel().config().isAutoRead()) { - ctx.channel().read(); - } - } - - @Override - public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { - super.channelReadComplete(ctx); - if (!ctx.channel().config().isAutoRead()) { - ctx.channel().read(); - } - } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); diff --git a/src/main/java/org/logstash/beats/FlowLimiterHandler.java b/src/main/java/org/logstash/beats/FlowLimiterHandler.java new file mode 100644 index 00000000..5ebeed04 --- /dev/null +++ b/src/main/java/org/logstash/beats/FlowLimiterHandler.java @@ -0,0 +1,35 @@ +package org.logstash.beats; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * Configure the channel where it's installed to operate the read in pull mode, + * disabling the autoread and explicitly invoking the read operation. + * */ +@ChannelHandler.Sharable +public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRegistered(final ChannelHandlerContext ctx) throws Exception { + ctx.channel().config().setAutoRead(false); + super.channelRegistered(ctx); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } +} diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 5c4e0d25..d135a548 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -132,6 +132,7 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); + pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new BeatsParser()); pipeline.addLast(new BeatsHandler(localMessageListener)); } From 084e0d19df1e15eeac2bc5f938bcb68dfc935cd8 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 12 Jul 2023 14:52:51 +0200 Subject: [PATCH 05/16] Update the flow control handler to avoid new reads if the channel becomes not writable, to excert backpressure to the sender system --- .../logstash/beats/FlowLimiterHandler.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/logstash/beats/FlowLimiterHandler.java b/src/main/java/org/logstash/beats/FlowLimiterHandler.java index 5ebeed04..5d88effd 100644 --- a/src/main/java/org/logstash/beats/FlowLimiterHandler.java +++ b/src/main/java/org/logstash/beats/FlowLimiterHandler.java @@ -1,16 +1,24 @@ package org.logstash.beats; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Configure the channel where it's installed to operate the read in pull mode, * disabling the autoread and explicitly invoking the read operation. + * The flow control to keep the outgoing buffer under control is done + * avoiding to read in new bytes if the outgoing direction became not writable, this + * excert back pressure to the TCP layer and ultimately to the upstream system. * */ @ChannelHandler.Sharable public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter { + private final static Logger logger = LogManager.getLogger(FlowLimiterHandler.class); + @Override public void channelRegistered(final ChannelHandlerContext ctx) throws Exception { ctx.channel().config().setAutoRead(false); @@ -20,7 +28,7 @@ public void channelRegistered(final ChannelHandlerContext ctx) throws Exception @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - if (!ctx.channel().config().isAutoRead()) { + if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) { ctx.channel().read(); } } @@ -28,8 +36,21 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); - if (!ctx.channel().config().isAutoRead()) { + if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) { ctx.channel().read(); } } + + private boolean isAutoreadDisabled(Channel channel) { + return !channel.config().isAutoRead(); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + ctx.channel().read(); + super.channelWritabilityChanged(ctx); + + logger.debug("Writability on channel {} changed to {}", ctx.channel(), ctx.channel().isWritable()); + } + } From 23f9d87bc4b809a81272f366109d13dbcd3ab082 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 12 Jul 2023 16:00:54 +0200 Subject: [PATCH 06/16] Separated the logic to drop incoming connections into specific handler On new channel registration (that correspond to a new client connection), verifies the direct memory stastus to understand if almost the totality max direct memory is reached and also if the majoproity of that space is used by pinned byte buffers. If the codition is verified that means direct memory avvailable is terminating, so no new connection would help in the situation, and the incoming new connections are closed. --- .../java/org/logstash/beats/BeatsParser.java | 80 ------------------- .../logstash/beats/FlowLimiterHandler.java | 6 +- src/main/java/org/logstash/beats/Server.java | 1 + .../beats/ThunderingGuardHandler.java | 40 ++++++++++ 4 files changed, 44 insertions(+), 83 deletions(-) create mode 100644 src/main/java/org/logstash/beats/ThunderingGuardHandler.java diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index d79d0728..635d1772 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -23,7 +23,6 @@ public class BeatsParser extends ByteToMessageDecoder { private final static Logger logger = LogManager.getLogger(BeatsParser.class); - private final static long maxDirectMemory = io.netty.util.internal.PlatformDependent.maxDirectMemory(); private Batch batch; @@ -49,10 +48,6 @@ private enum States { private int requiredBytes = 0; private int sequence = 0; private boolean decodingCompressedBuffer = false; - private long usedDirectMemory; - private boolean closeCalled = false; - - private boolean stopAutoreadRequested = false; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws InvalidFrameProtocolException, IOException { @@ -62,7 +57,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } return; } - usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); switch (currentState) { case READ_HEADER: { @@ -188,12 +182,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t case READ_COMPRESSED_FRAME: { logger.trace("Running: READ_COMPRESSED_FRAME"); - if (usedDirectMemory + requiredBytes > maxDirectMemory * 0.90) { - ctx.channel().config().setAutoRead(false); - ctx.close(); - closeCalled = true; - throw new IOException("not enough memory to decompress this from " + ctx.channel().id()); - } inflateCompressedFrame(ctx, in, (buffer) -> { transition(States.READ_HEADER); @@ -204,10 +192,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } } finally { decodingCompressedBuffer = false; - ctx.channel().config().setAutoRead(false); - ctx.channel().eventLoop().schedule(() -> { - ctx.channel().config().setAutoRead(true); - }, 5, TimeUnit.MILLISECONDS); transition(States.READ_HEADER); } }); @@ -276,70 +260,6 @@ private void batchComplete() { batch = null; } - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - //System.out.println("channelRead(" + ctx.channel().isActive() + ": " + ctx.channel().id() + ":" + currentState + ":" + decodingCompressedBuffer); - if (closeCalled) { - ((ByteBuf) msg).release(); - //if(batch != null) batch.release(); - return; - } - usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); - - // If we're just beginning a new frame on this channel, - // don't accumulate more data for 25 ms if usage of direct memory is above 20% - // - // The goal here is to avoid thundering herd: many beats connecting and sending data - // at the same time. As some channels progress to other states they'll use more memory - // but also give it back once a full batch is read. - if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) { - if (usedDirectMemory > (maxDirectMemory * 0.40)) { - if (!stopAutoreadRequested) { - ctx.channel().config().setAutoRead(false); - stopAutoreadRequested = true; - logger.info("Set channel {} to autoread FALSE (> 40%)", ctx.channel()); - //System.out.println("pausing reads on " + ctx.channel().id()); - ctx.channel().eventLoop().schedule(() -> { - //System.out.println("resuming reads on " + ctx.channel().id()); - ctx.channel().config().setAutoRead(true); - stopAutoreadRequested = false; - logger.info("Set channel {} to autoread TRUE after 200 ms", ctx.channel()); - }, 200, TimeUnit.MILLISECONDS); - } - } else { - //System.out.println("no need to pause reads on " + ctx.channel().id()); - } - } else if (usedDirectMemory > maxDirectMemory * 0.90) { - ctx.channel().config().setAutoRead(false); - stopAutoreadRequested = true; - logger.info("Set channel {} to autoread FALSE (> 90%)", ctx.channel()); - ctx.close(); - closeCalled = true; - ((ByteBuf) msg).release(); - if (batch != null) batch.release(); - throw new IOException("about to explode, cut them all down " + ctx.channel().id()); - } - super.channelRead(ctx, msg); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); - if (cause instanceof DecoderException) { - ctx.channel().config().setAutoRead(false); - if (!closeCalled) ctx.close(); - } else if (cause instanceof OutOfMemoryError) { - cause.printStackTrace(); - ctx.channel().config().setAutoRead(false); - if (!closeCalled) ctx.close(); - } else if (cause instanceof IOException) { - ctx.channel().config().setAutoRead(false); - if (!closeCalled) ctx.close(); - } else { - super.exceptionCaught(ctx, cause); - } - } - @FunctionalInterface private interface CheckedConsumer { void accept(T t) throws IOException; diff --git a/src/main/java/org/logstash/beats/FlowLimiterHandler.java b/src/main/java/org/logstash/beats/FlowLimiterHandler.java index 5d88effd..6a0da517 100644 --- a/src/main/java/org/logstash/beats/FlowLimiterHandler.java +++ b/src/main/java/org/logstash/beats/FlowLimiterHandler.java @@ -1,20 +1,20 @@ package org.logstash.beats; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** - * Configure the channel where it's installed to operate the read in pull mode, + * Configure the channel where it's installed to operate the reads in pull mode, * disabling the autoread and explicitly invoking the read operation. * The flow control to keep the outgoing buffer under control is done * avoiding to read in new bytes if the outgoing direction became not writable, this * excert back pressure to the TCP layer and ultimately to the upstream system. * */ -@ChannelHandler.Sharable +@Sharable public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter { private final static Logger logger = LogManager.getLogger(FlowLimiterHandler.class); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index d135a548..c791185c 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -133,6 +133,7 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); pipeline.addLast(new FlowLimiterHandler()); + pipeline.addLast(new ThunderingGuardHandler()); pipeline.addLast(new BeatsParser()); pipeline.addLast(new BeatsHandler(localMessageListener)); } diff --git a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java new file mode 100644 index 00000000..0b63e4ab --- /dev/null +++ b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java @@ -0,0 +1,40 @@ +package org.logstash.beats; + +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This handler is responsible to avoid accepting new connections when the direct memory + * consumption is close to the MaxDirectMemorySize. + * + * If the total allocated direct memory is close to the max memory size and also the pinned + * bytes from the direct memory allocator is close to the direct memory used, then it drops the new + * incoming connections. + * */ +@Sharable +public final class ThunderingGuardHandler extends ChannelInboundHandlerAdapter { + + private final static long MAX_DIRECT_MEMORY = io.netty.util.internal.PlatformDependent.maxDirectMemory(); + + private final static Logger logger = LogManager.getLogger(ThunderingGuardHandler.class); + + @Override + public void channelRegistered(final ChannelHandlerContext ctx) throws Exception { + PooledByteBufAllocator pooledAllocator = (PooledByteBufAllocator) ctx.alloc(); + long usedDirectMemory = pooledAllocator.metric().usedDirectMemory(); + if (usedDirectMemory > MAX_DIRECT_MEMORY * 0.90) { + long pinnedDirectMemory = pooledAllocator.pinnedDirectMemory(); + if (pinnedDirectMemory >= usedDirectMemory * 0.80) { + ctx.close(); + return; + } + } + + super.channelRegistered(ctx); + } +} From 3a9b86d5e582610375b86ca8cfe8a35ab7df50b4 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 12:25:12 +0200 Subject: [PATCH 07/16] Fist draft of the integration test --- .../beats/ThunderingGuardHandler.java | 4 +- .../beats/FlowLimiterHandlerTest.java | 247 ++++++++++++++++++ 2 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java diff --git a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java index 0b63e4ab..397d8acf 100644 --- a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java +++ b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java @@ -1,7 +1,6 @@ package org.logstash.beats; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -11,7 +10,7 @@ /** * This handler is responsible to avoid accepting new connections when the direct memory * consumption is close to the MaxDirectMemorySize. - * + *

* If the total allocated direct memory is close to the max memory size and also the pinned * bytes from the direct memory allocator is close to the direct memory used, then it drops the new * incoming connections. @@ -31,6 +30,7 @@ public void channelRegistered(final ChannelHandlerContext ctx) throws Exception long pinnedDirectMemory = pooledAllocator.pinnedDirectMemory(); if (pinnedDirectMemory >= usedDirectMemory * 0.80) { ctx.close(); + logger.info("Dropping connection {} due to high resource consumption", ctx.channel()); return; } } diff --git a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java new file mode 100644 index 00000000..c78e5b44 --- /dev/null +++ b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java @@ -0,0 +1,247 @@ +package org.logstash.beats; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.SocketChannelConfig; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class FlowLimiterHandlerTest { + + + @Test + public void testChannelInNotWriteable() { + EmbeddedChannel channel = new EmbeddedChannel(); + assertTrue(channel.isWritable()); + + makeChannelNotWriteablePassingTheOutboundHighWaterMark(channel); + + assertFalse(channel.isWritable()); + } + + private static void makeChannelNotWriteablePassingTheOutboundHighWaterMark(EmbeddedChannel channel) { + int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); + // generate payload to go over high watermark and trigger not writeable + final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); + assertEquals(writeBufferHighWaterMark, payload.readableBytes()); + assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.duplicate())); + assertTrue(channel.finish()); + assertFalse("Channel shouldn't be writeable", channel.isWritable()); + } + + @Test + public void givenAnEmptyChannelThenMessagesCanPassthrough() { + EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()); + + // write some data + final ByteBuf sample = prepareSample(16); + assertTrue(channel.writeInbound(sample.duplicate())); + assertTrue(channel.finish()); + + // read some data from the writeable channel + final ByteBuf readData = channel.readInbound(); + assertNotNull(readData); + assertEquals(sample, readData); + readData.release(); + } + + boolean writeable = true; + + @Test + public void givenANotWriteableChannelNoMessagesPassthrough() { + EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()) { +// @Override +// public boolean isWritable() { +// unsafe().outboundBuffer().remove() +// return writeable; +// } + +// @Override +// protected void doWrite(ChannelOutboundBuffer in) throws Exception { +// for (;;) { +// Object msg = in.current(); +// if (msg == null) { +// break; +// } +// } +// } + +// @Override +// public Channel flush() { +// // avoid to flush the outbound buffer to that is remains not writeable +// return this; +// } + }; + + int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); + // generate payload to go over high watermark and trigger not writeable + final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); + assertEquals(writeBufferHighWaterMark, payload.readableBytes()); +// for (int i = 0; i < 16; i++) { +// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); +// } + +// int numBuffers = 0; +// while (channel.isWritable()) { +// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); +// numBuffers ++; +// } + +// assertTrue(channel.finish()); +// channel.releaseOutbound(); +// channel.runPendingTasks(); + channel.flushOutbound(); +// writeable = false; + assertFalse("Channel shouldn't be writeable", channel.isWritable()); + + // write some data + final ByteBuf sample = prepareSample(16); + assertTrue(channel.writeInbound(sample.duplicate())); + assertTrue(channel.finish()); + + // read some data from the writeable channel + final ByteBuf readData = channel.readInbound(); + assertNull(readData); + readData.release(); + } + + private ByteBuf prepareSample(int numBytes) { + return prepareSample(numBytes, 'A'); + } + + private static ByteBuf prepareSample(int numBytes, char c) { + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(numBytes); + for (int i = 0; i < numBytes; i++) { + payload.writeByte(c); + } + return payload; + } + + + boolean anyOtherCall = false; + @Test + public void testIntegration() throws Exception { + final int highWaterMark = 32 * 1024; + FlowLimiterHandler sut = new FlowLimiterHandler(); + + NioEventLoopGroup group = new NioEventLoopGroup(); + ServerBootstrap b = new ServerBootstrap(); + try { + b.group(group) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.config().setWriteBufferHighWaterMark(highWaterMark); + ch.pipeline() + .addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + System.out.println("Client connected"); + // write as much to move the channel in not writable state + final ByteBuf payload = prepareSample(highWaterMark, 'C'); + ByteBuf copy = payload.copy(); + int readableBytes = copy.readableBytes(); + ctx.pipeline().writeAndFlush(copy).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + System.out.println("Bytes to be read: " + readableBytes + " now availables:" + copy.readableBytes()); + } + }); +// System.out.println("Bytes to be read: " + readableBytes + " now:" + copy.readableBytes()); +// payload.release(); + + System.out.println("Channel isWritable?" + ctx.channel().isWritable()); + int numBuffers = 0; + while (ctx.channel().isWritable()) { + ctx.pipeline().writeAndFlush(payload.copy()); + numBuffers ++; + } + System.out.println("Num buffers wrote to get the writable false: " + numBuffers); + System.out.println("Channel isWritable? " + ctx.channel().isWritable() + " high water mark: " + ctx.channel().config().getWriteBufferHighWaterMark()); + // ask the client to send some data present on the channel + int receiveBufferSize = ((SocketChannelConfig) ctx.channel().config()).getReceiveBufferSize(); + System.out.println("Server's receive buffer size: " + receiveBufferSize); + clientChannel.writeAndFlush(prepareSample(32)); + } + }) + .addLast(sut) + .addLast(new SimpleChannelInboundHandler() { + boolean firstChunkRead = false; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + System.out.println("Last handler Buffer read size: " + msg.readableBytes()); + if (!firstChunkRead) { + assertEquals("Expect to read a first chunk and no others", 32, msg.readableBytes()); + firstChunkRead = true; + + // client write other data that MUSTN'T be read by the server, because + // is in rate limiting. + clientChannel.writeAndFlush(prepareSample(16)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // on successful flush schedule a shutdown + ctx.channel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + group.shutdownGracefully(); + } + }, 2, TimeUnit.SECONDS); + } + }); + + } else { + // the first read happened, no other reads are commanded by the server + // should never pass here + anyOtherCall = true; + } + } + }); + } + }); + ChannelFuture future = b.bind("0.0.0.0", 1234).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + startAClient(group); + } + } + }).sync(); + future.channel().closeFuture().sync(); + } finally { + group.shutdownGracefully().sync(); + } + + assertFalse("Shouldn't never be notified other data while in rate limiting", anyOtherCall); + } + + Channel clientChannel; + + private void startAClient(NioEventLoopGroup group) { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.config().setAutoRead(false); + clientChannel = ch; + } + }); + b.connect("localhost", 1234); + } + +} \ No newline at end of file From 54420f55fb1c3a847fa142d0228205a042154c47 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 12:26:34 +0200 Subject: [PATCH 08/16] Removed tests using EmbeddedChannel because doesn't manage the writeable status due to that offload every message to the outbound list --- .../beats/FlowLimiterHandlerTest.java | 97 ------------------- 1 file changed, 97 deletions(-) diff --git a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java index c78e5b44..00f6e62b 100644 --- a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java +++ b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java @@ -19,103 +19,6 @@ public class FlowLimiterHandlerTest { - - @Test - public void testChannelInNotWriteable() { - EmbeddedChannel channel = new EmbeddedChannel(); - assertTrue(channel.isWritable()); - - makeChannelNotWriteablePassingTheOutboundHighWaterMark(channel); - - assertFalse(channel.isWritable()); - } - - private static void makeChannelNotWriteablePassingTheOutboundHighWaterMark(EmbeddedChannel channel) { - int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); - // generate payload to go over high watermark and trigger not writeable - final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); - assertEquals(writeBufferHighWaterMark, payload.readableBytes()); - assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.duplicate())); - assertTrue(channel.finish()); - assertFalse("Channel shouldn't be writeable", channel.isWritable()); - } - - @Test - public void givenAnEmptyChannelThenMessagesCanPassthrough() { - EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()); - - // write some data - final ByteBuf sample = prepareSample(16); - assertTrue(channel.writeInbound(sample.duplicate())); - assertTrue(channel.finish()); - - // read some data from the writeable channel - final ByteBuf readData = channel.readInbound(); - assertNotNull(readData); - assertEquals(sample, readData); - readData.release(); - } - - boolean writeable = true; - - @Test - public void givenANotWriteableChannelNoMessagesPassthrough() { - EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()) { -// @Override -// public boolean isWritable() { -// unsafe().outboundBuffer().remove() -// return writeable; -// } - -// @Override -// protected void doWrite(ChannelOutboundBuffer in) throws Exception { -// for (;;) { -// Object msg = in.current(); -// if (msg == null) { -// break; -// } -// } -// } - -// @Override -// public Channel flush() { -// // avoid to flush the outbound buffer to that is remains not writeable -// return this; -// } - }; - - int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); - // generate payload to go over high watermark and trigger not writeable - final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); - assertEquals(writeBufferHighWaterMark, payload.readableBytes()); -// for (int i = 0; i < 16; i++) { -// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); -// } - -// int numBuffers = 0; -// while (channel.isWritable()) { -// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); -// numBuffers ++; -// } - -// assertTrue(channel.finish()); -// channel.releaseOutbound(); -// channel.runPendingTasks(); - channel.flushOutbound(); -// writeable = false; - assertFalse("Channel shouldn't be writeable", channel.isWritable()); - - // write some data - final ByteBuf sample = prepareSample(16); - assertTrue(channel.writeInbound(sample.duplicate())); - assertTrue(channel.finish()); - - // read some data from the writeable channel - final ByteBuf readData = channel.readInbound(); - assertNull(readData); - readData.release(); - } - private ByteBuf prepareSample(int numBytes) { return prepareSample(numBytes, 'A'); } From b9a8cd4d0a00795a623810c7e06968e662162d2f Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 15:38:08 +0200 Subject: [PATCH 09/16] Reshaped the asynch code to be more linear --- .../beats/FlowLimiterHandlerTest.java | 187 +++++++++++------- 1 file changed, 116 insertions(+), 71 deletions(-) diff --git a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java index 00f6e62b..f268621e 100644 --- a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java +++ b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java @@ -4,22 +4,29 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.*; -import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.junit.Test; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static org.junit.Assert.*; public class FlowLimiterHandlerTest { - private ByteBuf prepareSample(int numBytes) { + private ReadMessagesCollector readMessagesCollector; + + private static ByteBuf prepareSample(int numBytes) { return prepareSample(numBytes, 'A'); } @@ -31,15 +38,101 @@ private static ByteBuf prepareSample(int numBytes, char c) { return payload; } + private ChannelInboundHandlerAdapter onClientConnected(Consumer action) { + return new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + action.accept(ctx); + } + }; + } + + private static class ReadMessagesCollector extends SimpleChannelInboundHandler { + private Channel clientChannel; + private final NioEventLoopGroup group; + boolean firstChunkRead = false; + + ReadMessagesCollector(NioEventLoopGroup group) { + this.group = group; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + if (!firstChunkRead) { + assertEquals("Expect to read a first chunk and no others", 32, msg.readableBytes()); + firstChunkRead = true; + + // client write other data that MUSTN'T be read by the server, because + // is rate limited. + clientChannel.writeAndFlush(prepareSample(16)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // on successful flush schedule a shutdown + ctx.channel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + group.shutdownGracefully(); + } + }, 2, TimeUnit.SECONDS); + } else { + ctx.fireExceptionCaught(future.cause()); + } + } + }); + + } else { + // the first read happened, no other reads are commanded by the server + // should never pass here + fail("Shouldn't never be notified other data while in rate limiting"); + } + } + + public void updateClient(Channel clientChannel) { + assertNotNull(clientChannel); + this.clientChannel = clientChannel; + } + } + + + private static class AssertionsHandler extends ChannelInboundHandlerAdapter { + + private final NioEventLoopGroup group; + + private Throwable lastError; + + public AssertionsHandler(NioEventLoopGroup group) { + this.group = group; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + lastError = cause; + group.shutdownGracefully(); + } + + public void assertNoErrors() { + if (lastError != null) { + if (lastError instanceof AssertionError) { + throw (AssertionError) lastError; + } else { + fail("Failed with error" + lastError); + } + } + } + } - boolean anyOtherCall = false; @Test - public void testIntegration() throws Exception { + public void givenAChannelInNotWriteableStateWhenNewBuffersAreSentByClientThenNoDecodeTakePartOnServerSide() throws Exception { final int highWaterMark = 32 * 1024; FlowLimiterHandler sut = new FlowLimiterHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); + + readMessagesCollector = new ReadMessagesCollector(group); + AssertionsHandler assertionsHandler = new AssertionsHandler(group); try { b.group(group) .channel(NioServerSocketChannel.class) @@ -48,71 +141,15 @@ public void testIntegration() throws Exception { protected void initChannel(SocketChannel ch) throws Exception { ch.config().setWriteBufferHighWaterMark(highWaterMark); ch.pipeline() - .addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - System.out.println("Client connected"); - // write as much to move the channel in not writable state - final ByteBuf payload = prepareSample(highWaterMark, 'C'); - ByteBuf copy = payload.copy(); - int readableBytes = copy.readableBytes(); - ctx.pipeline().writeAndFlush(copy).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - System.out.println("Bytes to be read: " + readableBytes + " now availables:" + copy.readableBytes()); - } - }); -// System.out.println("Bytes to be read: " + readableBytes + " now:" + copy.readableBytes()); -// payload.release(); - - System.out.println("Channel isWritable?" + ctx.channel().isWritable()); - int numBuffers = 0; - while (ctx.channel().isWritable()) { - ctx.pipeline().writeAndFlush(payload.copy()); - numBuffers ++; - } - System.out.println("Num buffers wrote to get the writable false: " + numBuffers); - System.out.println("Channel isWritable? " + ctx.channel().isWritable() + " high water mark: " + ctx.channel().config().getWriteBufferHighWaterMark()); - // ask the client to send some data present on the channel - int receiveBufferSize = ((SocketChannelConfig) ctx.channel().config()).getReceiveBufferSize(); - System.out.println("Server's receive buffer size: " + receiveBufferSize); - clientChannel.writeAndFlush(prepareSample(32)); - } - }) + .addLast(onClientConnected(ctx -> { + // write as much to move the channel in not writable state + fillOutboundWatermark(ctx, highWaterMark); + // ask the client to send some data present on the channel + clientChannel.writeAndFlush(prepareSample(32)); + })) .addLast(sut) - .addLast(new SimpleChannelInboundHandler() { - boolean firstChunkRead = false; - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - System.out.println("Last handler Buffer read size: " + msg.readableBytes()); - if (!firstChunkRead) { - assertEquals("Expect to read a first chunk and no others", 32, msg.readableBytes()); - firstChunkRead = true; - - // client write other data that MUSTN'T be read by the server, because - // is in rate limiting. - clientChannel.writeAndFlush(prepareSample(16)).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // on successful flush schedule a shutdown - ctx.channel().eventLoop().schedule(new Runnable() { - @Override - public void run() { - group.shutdownGracefully(); - } - }, 2, TimeUnit.SECONDS); - } - }); - - } else { - // the first read happened, no other reads are commanded by the server - // should never pass here - anyOtherCall = true; - } - } - }); + .addLast(readMessagesCollector) + .addLast(assertionsHandler); } }); ChannelFuture future = b.bind("0.0.0.0", 1234).addListener(new ChannelFutureListener() { @@ -128,7 +165,14 @@ public void operationComplete(ChannelFuture future) throws Exception { group.shutdownGracefully().sync(); } - assertFalse("Shouldn't never be notified other data while in rate limiting", anyOtherCall); + assertionsHandler.assertNoErrors(); + } + + private static void fillOutboundWatermark(ChannelHandlerContext ctx, int highWaterMark) { + final ByteBuf payload = prepareSample(highWaterMark, 'C'); + while (ctx.channel().isWritable()) { + ctx.pipeline().writeAndFlush(payload.copy()); + } } Channel clientChannel; @@ -142,6 +186,7 @@ private void startAClient(NioEventLoopGroup group) { protected void initChannel(SocketChannel ch) throws Exception { ch.config().setAutoRead(false); clientChannel = ch; + readMessagesCollector.updateClient(clientChannel); } }); b.connect("localhost", 1234); From 534e2da99384ffdc739c1121ab28c52ba7ff29cf Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 18:29:49 +0200 Subject: [PATCH 10/16] Covered the number of connection limiter with unit test --- .../beats/ThunderingGuardHandlerTest.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java diff --git a/src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java b/src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java new file mode 100644 index 00000000..d93f6582 --- /dev/null +++ b/src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java @@ -0,0 +1,83 @@ +package org.logstash.beats; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.ReferenceCounted; +import io.netty.util.internal.PlatformDependent; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +public class ThunderingGuardHandlerTest { + + public static final int MB = 1024 * 1024; + public static final long MAX_DIRECT_MEMORY_BYTES = PlatformDependent.maxDirectMemory(); + + @Test + public void testVerifyDirectMemoryCouldGoBeyondThe90Percent() { + // allocate 90% of direct memory + List allocatedBuffers = allocateDirectMemory(MAX_DIRECT_MEMORY_BYTES, 0.9); + + // allocate one more + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB); + long usedDirectMemory = PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory(); + long pinnedDirectMemory = PooledByteBufAllocator.DEFAULT.pinnedDirectMemory(); + + // verify + assertTrue("Direct memory allocation should be > 90% of the max available", usedDirectMemory > 0.9 * MAX_DIRECT_MEMORY_BYTES); + assertTrue("Direct memory usage should be > 80% of the max available", pinnedDirectMemory > 0.8 * MAX_DIRECT_MEMORY_BYTES); + + allocatedBuffers.forEach(ReferenceCounted::release); + payload.release(); + } + + private static List allocateDirectMemory(long maxDirectMemoryBytes, double percentage) { + List allocatedBuffers = new ArrayList<>(); + final long numBuffersToAllocate = (long) (maxDirectMemoryBytes / MB * percentage); + for (int i = 0; i < numBuffersToAllocate; i++) { + allocatedBuffers.add(PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB)); + } + return allocatedBuffers; + } + + @Test + public void givenUsedDirectMemoryAndPinnedMemoryAreCloseToTheMaxDirectAvailableWhenNewConnectionIsCreatedThenItIsReject() { + EmbeddedChannel channel = new EmbeddedChannel(new ThunderingGuardHandler()); + + // consume > 90% of the direct memory + List allocatedBuffers = allocateDirectMemory(MAX_DIRECT_MEMORY_BYTES, 0.9); + // allocate one more + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB); + + channel.pipeline().fireChannelRegistered(); + + // verify + assertFalse("Under constrained memory new channels has to be forcibly closed", channel.isOpen()); + + allocatedBuffers.forEach(ReferenceCounted::release); + payload.release(); + } + + @Test + public void givenUsedDirectMemoryAndNotPinnedWhenNewConnectionIsCreatedThenItIsAccepted() { + EmbeddedChannel channel = new EmbeddedChannel(new ThunderingGuardHandler()); + + // consume > 90% of the direct memory + List allocatedBuffers = allocateDirectMemory(MAX_DIRECT_MEMORY_BYTES, 0.9); + allocatedBuffers.forEach(ReferenceCounted::release); + // allocate one more + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB); + payload.release(); + + channel.pipeline().fireChannelRegistered(); + + // verify + assertTrue("Despite memory is allocated but not pinned, new connections MUST be accepted", channel.isOpen()); + + } + +} \ No newline at end of file From 25832732ca098cd7f53c51476ea45340401fae96 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 20 Jul 2023 16:54:18 +0200 Subject: [PATCH 11/16] Pessimistic remediation, when a direct OOM happens close the channel --- .../java/org/logstash/beats/BeatsParser.java | 12 +++- .../logstash/beats/OOMConnectionCloser.java | 60 +++++++++++++++++++ src/main/java/org/logstash/beats/Server.java | 3 + 3 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/logstash/beats/OOMConnectionCloser.java diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 635d1772..f9544580 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -2,6 +2,7 @@ import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; @@ -199,7 +200,16 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } case READ_JSON: { logger.trace("Running: READ_JSON"); - ((V2Batch) batch).addMessage(sequence, in, requiredBytes); + try { + ((V2Batch) batch).addMessage(sequence, in, requiredBytes); + } catch (Throwable th) { + // batch has to release its internal buffer before bubbling up the exception + batch.release(); + + // re throw the same error after released the internal buffer + throw th; + } + if (batch.isComplete()) { if (logger.isTraceEnabled()) { logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java new file mode 100644 index 00000000..5ba3effb --- /dev/null +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -0,0 +1,60 @@ +package org.logstash.beats; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class OOMConnectionCloser extends ChannelInboundHandlerAdapter { + + private static class DirectMemoryUsage { + private final long used; + private final long pinned; + private final short ratio; + + private DirectMemoryUsage(long used, long pinned) { + this.used = used; + this.pinned = pinned; + this.ratio = (short) Math.round(((double) pinned / used) * 100); + } + + static DirectMemoryUsage capture() { + PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + long usedDirectMemory = allocator.metric().usedDirectMemory(); + long pinnedDirectMemory = allocator.pinnedDirectMemory(); + return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory); + } + } + + private final static Logger logger = LogManager.getLogger(OOMConnectionCloser.class); + + public static final Pattern DIRECT_MEMORY_ERROR = Pattern.compile("^Cannot reserve \\d* bytes of direct buffer memory.*$"); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (isDirectMemoryOOM(cause)) { + DirectMemoryUsage direct = DirectMemoryUsage.capture(); + logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); + logger.warn("Dropping connection {} because run out of direct memory. To fix it, in Filebeat you can" + + "enable slow_start, reduce number of workers, reduce the bulk_max_size or even raise up the -XX:MaxDirectMemorySize " + + "option in the JVM running Logstash", ctx.channel()); + ctx.flush(); + ctx.close(); + } else { + super.exceptionCaught(ctx, cause); + } + } + + private boolean isDirectMemoryOOM(Throwable th) { + if (!(th instanceof OutOfMemoryError)) { + return false; + } + Matcher m = DIRECT_MEMORY_ERROR.matcher(th.getMessage()); + return m.matches(); + } +} \ No newline at end of file diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index c791185c..e456640d 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -1,6 +1,8 @@ package org.logstash.beats; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -135,6 +137,7 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new ThunderingGuardHandler()); pipeline.addLast(new BeatsParser()); + pipeline.addLast(new OOMConnectionCloser()); pipeline.addLast(new BeatsHandler(localMessageListener)); } From 8f3b08c0b1a9560163aa3a580aa4080f5fb74470 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 28 Jul 2023 10:50:38 +0200 Subject: [PATCH 12/16] Removed from the log string any reference to Filebeat --- src/main/java/org/logstash/beats/OOMConnectionCloser.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java index 5ba3effb..7336b666 100644 --- a/src/main/java/org/logstash/beats/OOMConnectionCloser.java +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -40,9 +40,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (isDirectMemoryOOM(cause)) { DirectMemoryUsage direct = DirectMemoryUsage.capture(); logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); - logger.warn("Dropping connection {} because run out of direct memory. To fix it, in Filebeat you can" + - "enable slow_start, reduce number of workers, reduce the bulk_max_size or even raise up the -XX:MaxDirectMemorySize " + - "option in the JVM running Logstash", ctx.channel()); + logger.warn("Dropping connection {} because run out of direct memory. To fix it, check if the upstream source " + + "has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. " + + "Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); ctx.flush(); ctx.close(); } else { From 7ea163f6979653f2af2940b4a89d8e89831438d1 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 28 Jul 2023 11:05:21 +0200 Subject: [PATCH 13/16] Raised up the log level level when dropping connections becuase of thundering and going to OOM condition --- src/main/java/org/logstash/beats/ThunderingGuardHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java index 397d8acf..a880ed35 100644 --- a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java +++ b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java @@ -30,7 +30,7 @@ public void channelRegistered(final ChannelHandlerContext ctx) throws Exception long pinnedDirectMemory = pooledAllocator.pinnedDirectMemory(); if (pinnedDirectMemory >= usedDirectMemory * 0.80) { ctx.close(); - logger.info("Dropping connection {} due to high resource consumption", ctx.channel()); + logger.warn("Dropping connection {} due to high resource consumption", ctx.channel()); return; } } From 50c46252139120c015ea09dfce16a267c714dab2 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Fri, 28 Jul 2023 13:48:33 +0200 Subject: [PATCH 14/16] Better actionable suggestion to user in case of OOM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- src/main/java/org/logstash/beats/OOMConnectionCloser.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java index 7336b666..57a69802 100644 --- a/src/main/java/org/logstash/beats/OOMConnectionCloser.java +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -40,9 +40,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (isDirectMemoryOOM(cause)) { DirectMemoryUsage direct = DirectMemoryUsage.capture(); logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); - logger.warn("Dropping connection {} because run out of direct memory. To fix it, check if the upstream source " + - "has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. " + - "Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); + logger.warn("Dropping connection {} due to lack of available Direct Memory. Please lower the number of concurrent connections or reduce the batch size. " + + "Alternatively, raise -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); ctx.flush(); ctx.close(); } else { From 8283b5724cd00f2372b87d5573e6cf2afe538487 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 25 Aug 2023 16:52:28 +0200 Subject: [PATCH 15/16] Experiment code to run BeatsParser in isolation --- build.gradle | 21 ++++++- .../java/org/logstash/beats/BeatsParser.java | 62 ++++++++++--------- src/main/java/org/logstash/beats/Server.java | 32 +++++++--- .../java/org/logstash/beats/ServerRunner.java | 42 +++++++++++++ 4 files changed, 118 insertions(+), 39 deletions(-) create mode 100644 src/main/java/org/logstash/beats/ServerRunner.java diff --git a/build.gradle b/build.gradle index 02cd2de7..4733acf5 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,14 @@ repositories { dependencies { testImplementation 'junit:junit:4.12' testImplementation 'org.hamcrest:hamcrest-library:1.3' - testImplementation 'org.apache.logging.log4j:log4j-core:2.17.0' + implementation 'org.apache.logging.log4j:log4j-core:2.17.0' + + // needed to run ServerRunner self hosted + implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" + implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonDatabindVersion}" + implementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" + implementation "com.fasterxml.jackson.module:jackson-module-afterburner:2.15.2" + implementation "io.netty:netty-buffer:${nettyVersion}" implementation "io.netty:netty-codec:${nettyVersion}" implementation "io.netty:netty-common:${nettyVersion}" @@ -104,3 +111,15 @@ publishing { } } } + +jar { + duplicatesStrategy(DuplicatesStrategy.EXCLUDE) + + manifest { + attributes "Main-Class": "org.logstash.beats.ServerRunner" + } + + from { + configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) } + } +} diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index f9544580..f0d49fb0 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -2,12 +2,10 @@ import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.DecoderException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -17,7 +15,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.zip.Inflater; import java.util.zip.InflaterOutputStream; @@ -59,9 +56,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t return; } + if (!ctx.channel().isOpen()) { + logger.info("Channel is not open, {}", ctx.channel()); + } + switch (currentState) { case READ_HEADER: { - logger.trace("Running: READ_HEADER"); + logger.trace("Running: READ_HEADER {}", ctx.channel()); int version = Protocol.version(in.readByte()); @@ -74,7 +75,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t batch = new V1Batch(); } } - transition(States.READ_FRAME_TYPE); + transition(States.READ_FRAME_TYPE, ctx.channel()); break; } case READ_FRAME_TYPE: { @@ -82,20 +83,20 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t switch(frameType) { case Protocol.CODE_WINDOW_SIZE: { - transition(States.READ_WINDOW_SIZE); + transition(States.READ_WINDOW_SIZE, ctx.channel()); break; } case Protocol.CODE_JSON_FRAME: { // Reading Sequence + size of the payload - transition(States.READ_JSON_HEADER); + transition(States.READ_JSON_HEADER, ctx.channel()); break; } case Protocol.CODE_COMPRESSED_FRAME: { - transition(States.READ_COMPRESSED_FRAME_HEADER); + transition(States.READ_COMPRESSED_FRAME_HEADER, ctx.channel()); break; } case Protocol.CODE_FRAME: { - transition(States.READ_DATA_FIELDS); + transition(States.READ_DATA_FIELDS, ctx.channel()); break; } default: { @@ -105,8 +106,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t break; } case READ_WINDOW_SIZE: { - logger.trace("Running: READ_WINDOW_SIZE"); - batch.setBatchSize((int) in.readUnsignedInt()); + logger.trace("Running: READ_WINDOW_SIZE {}", ctx.channel()); + int batchSize = (int) in.readUnsignedInt(); + logger.trace("Batch size: {} - channel {}", batchSize, ctx.channel()); + batch.setBatchSize(batchSize); // This is unlikely to happen but I have no way to known when a frame is // actually completely done other than checking the windows and the sequence number, @@ -118,12 +121,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t batchComplete(); } - transition(States.READ_HEADER); + transition(States.READ_HEADER, ctx.channel()); break; } case READ_DATA_FIELDS: { // Lumberjack version 1 protocol, which use the Key:Value format. - logger.trace("Running: READ_DATA_FIELDS"); + logger.trace("Running: READ_DATA_FIELDS {}", ctx.channel()); sequence = (int) in.readUnsignedInt(); int fieldsCount = (int) in.readUnsignedInt(); int count = 0; @@ -156,35 +159,36 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t out.add(batch); batchComplete(); } - transition(States.READ_HEADER); + transition(States.READ_HEADER, ctx.channel()); break; } case READ_JSON_HEADER: { - logger.trace("Running: READ_JSON_HEADER"); + logger.trace("Running: READ_JSON_HEADER {}", ctx.channel()); sequence = (int) in.readUnsignedInt(); + logger.trace("Sequence num to read {} for channel {}", sequence, ctx.channel()); int jsonPayloadSize = (int) in.readUnsignedInt(); if(jsonPayloadSize <= 0) { throw new InvalidFrameProtocolException("Invalid json length, received: " + jsonPayloadSize); } - transition(States.READ_JSON, jsonPayloadSize); + transition(States.READ_JSON, jsonPayloadSize, ctx.channel()); break; } case READ_COMPRESSED_FRAME_HEADER: { - logger.trace("Running: READ_COMPRESSED_FRAME_HEADER"); + logger.trace("Running: READ_COMPRESSED_FRAME_HEADER {}", ctx.channel()); - transition(States.READ_COMPRESSED_FRAME, in.readInt()); + transition(States.READ_COMPRESSED_FRAME, in.readInt(), ctx.channel()); break; } case READ_COMPRESSED_FRAME: { - logger.trace("Running: READ_COMPRESSED_FRAME"); + logger.trace("Running: READ_COMPRESSED_FRAME {}", ctx.channel()); inflateCompressedFrame(ctx, in, (buffer) -> { - transition(States.READ_HEADER); + transition(States.READ_HEADER, ctx.channel()); decodingCompressedBuffer = true; try { @@ -193,13 +197,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } } finally { decodingCompressedBuffer = false; - transition(States.READ_HEADER); + transition(States.READ_HEADER, ctx.channel()); } }); break; } case READ_JSON: { - logger.trace("Running: READ_JSON"); + logger.trace("Running: READ_JSON {}", ctx.channel()); try { ((V2Batch) batch).addMessage(sequence, in, requiredBytes); } catch (Throwable th) { @@ -212,13 +216,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t if (batch.isComplete()) { if (logger.isTraceEnabled()) { - logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); + logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence + " {}", ctx.channel()); } out.add(batch); batchComplete(); } - transition(States.READ_HEADER); + transition(States.READ_HEADER, ctx.channel()); break; } } @@ -252,13 +256,13 @@ private boolean hasEnoughBytes(ByteBuf in) { return in.readableBytes() >= requiredBytes; } - private void transition(States next) { - transition(next, next.length); + private void transition(States next, Channel ch) { + transition(next, next.length, ch); } - private void transition(States nextState, int requiredBytes) { + private void transition(States nextState, int requiredBytes, Channel ch) { if (logger.isTraceEnabled()) { - logger.trace("Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes"); + logger.trace("Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes {}", ch); } this.currentState = nextState; this.requiredBytes = requiredBytes; diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index e456640d..8e7786b6 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -130,15 +130,29 @@ public void initChannel(SocketChannel socket){ if (isSslEnabled()) { pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); } - pipeline.addLast(idleExecutorGroup, IDLESTATE_HANDLER, - new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); - pipeline.addLast(BEATS_ACKER, new AckEncoder()); - pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); - pipeline.addLast(new FlowLimiterHandler()); - pipeline.addLast(new ThunderingGuardHandler()); - pipeline.addLast(new BeatsParser()); - pipeline.addLast(new OOMConnectionCloser()); - pipeline.addLast(new BeatsHandler(localMessageListener)); +// pipeline.addLast(idleExecutorGroup, IDLESTATE_HANDLER, +// new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); +// pipeline.addLast(BEATS_ACKER, new AckEncoder()); +// pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); + +// pipeline.addLast(new FlowLimiterHandler()); +// pipeline.addLast(new ThunderingGuardHandler()); + pipeline.addLast("beats parser", new BeatsParser()); +// pipeline.addLast(new OOMConnectionCloser()); +// pipeline.addLast("beats handler", new BeatsHandler(localMessageListener)); + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.warn("Exception {} received on {}", cause.getMessage(), ctx.channel()); +// pipeline.remove("beats parser"); +// if (cause instanceof OutOfMemoryError) { + ctx.channel().close(); +// } + super.exceptionCaught(ctx, cause); + } + }); + + logger.info("Starting with handlers: {}", pipeline.names()); } diff --git a/src/main/java/org/logstash/beats/ServerRunner.java b/src/main/java/org/logstash/beats/ServerRunner.java new file mode 100644 index 00000000..d6b39ccc --- /dev/null +++ b/src/main/java/org/logstash/beats/ServerRunner.java @@ -0,0 +1,42 @@ +package org.logstash.beats; + +import io.netty.channel.ChannelHandlerContext; + +public class ServerRunner { + + public static void main(String[] args) throws InterruptedException { + int clientInactivityTimeoutSeconds = 60; + Server server = new Server("127.0.0.1", 3334, clientInactivityTimeoutSeconds, Runtime.getRuntime().availableProcessors()); + + // no TLS + + server.setMessageListener(new IMessageListener() { + @Override + public void onNewMessage(ChannelHandlerContext ctx, Message message) { + + } + + @Override + public void onNewConnection(ChannelHandlerContext ctx) { + + } + + @Override + public void onConnectionClose(ChannelHandlerContext ctx) { + + } + + @Override + public void onException(ChannelHandlerContext ctx, Throwable cause) { + + } + + @Override + public void onChannelInitializeException(ChannelHandlerContext ctx, Throwable cause) { + + } + }); + // blocking till the end + server.listen(); + } +} From c1179f58557aa699e8ff8f31d68fa7735df95eae Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 11 Sep 2023 16:27:57 +0200 Subject: [PATCH 16/16] Added TLS listening --- .../java/org/logstash/beats/ServerRunner.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/logstash/beats/ServerRunner.java b/src/main/java/org/logstash/beats/ServerRunner.java index d6b39ccc..3b263a89 100644 --- a/src/main/java/org/logstash/beats/ServerRunner.java +++ b/src/main/java/org/logstash/beats/ServerRunner.java @@ -1,12 +1,27 @@ package org.logstash.beats; import io.netty.channel.ChannelHandlerContext; +import org.logstash.netty.SslContextBuilder; +import org.logstash.netty.SslHandlerProvider; public class ServerRunner { - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws Exception { int clientInactivityTimeoutSeconds = 60; - Server server = new Server("127.0.0.1", 3334, clientInactivityTimeoutSeconds, Runtime.getRuntime().availableProcessors()); + Server server = new Server("127.0.0.1", 3333, clientInactivityTimeoutSeconds, Runtime.getRuntime().availableProcessors()); + + // enable TLS + System.out.println("Using SSL"); + + String sslCertificate = "/Users/andrea/workspace/certificates/client_from_root.crt"; + String sslKey = "/Users/andrea/workspace/certificates/client_from_root.key.pkcs8"; + String[] certificateAuthorities = new String[] { "/Users/andrea/workspace/certificates/root.crt" }; + + SslContextBuilder sslBuilder = new SslContextBuilder(sslCertificate, sslKey, null) + .setProtocols(new String[] { "TLSv1.2", "TLSv1.3" }) + .setClientAuthentication(SslContextBuilder.SslClientVerifyMode.REQUIRED, certificateAuthorities); + SslHandlerProvider sslHandlerProvider = new SslHandlerProvider(sslBuilder.buildContext(), 10000); + server.setSslHandlerProvider(sslHandlerProvider); // no TLS