diff --git a/src/test/java/com/teragrep/rlp_03/ManualPerformanceTest.java b/src/test/java/com/teragrep/rlp_03/ManualPerformanceTest.java index 88317ae6..e6d460fe 100644 --- a/src/test/java/com/teragrep/rlp_03/ManualPerformanceTest.java +++ b/src/test/java/com/teragrep/rlp_03/ManualPerformanceTest.java @@ -49,7 +49,6 @@ import com.teragrep.rlp_03.eventloop.EventLoop; import com.teragrep.rlp_03.eventloop.EventLoopFactory; import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate; -import com.teragrep.rlp_03.frame.delegate.FrameContext; import com.teragrep.rlp_03.frame.delegate.FrameDelegate; import com.teragrep.rlp_03.server.Server; import com.teragrep.rlp_03.server.ServerFactory; @@ -63,12 +62,12 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.Supplier; public class ManualPerformanceTest { private static final Logger LOGGER = LoggerFactory.getLogger(ManualPerformanceTest.class); + final AtomicLong recordCount = new AtomicLong(); @Test // for testing with manual tools @EnabledIfSystemProperty( @@ -86,11 +85,12 @@ public void runServerTest() throws InterruptedException, IOException { int port = Integer.parseInt(System.getProperty("ServerPerformanceTestPort", "1601")); LOGGER.info("Starting ManualPerformanceTest with threads <{}> at port <{}>", threads, port); - final FrameConsumer frameConsumer = new FrameConsumer(); Supplier frameDelegateSupplier = () -> { LOGGER.info("requested a new frameDelegate instance "); - return new DefaultFrameDelegate(frameConsumer); + return new DefaultFrameDelegate(frameContext -> { + recordCount.incrementAndGet(); + }); }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( @@ -108,61 +108,41 @@ public void runServerTest() throws InterruptedException, IOException { ); Server server = serverFactory.create(port); - final Reporter reporter = new Reporter(server, frameConsumer, threadPoolExecutor); - ; + final Reporter reporter = new Reporter(server, recordCount, threadPoolExecutor); Thread reporterThread = new Thread(reporter); reporterThread.start(); + Thread.sleep(Long.MAX_VALUE); + eventLoop.stop(); threadPoolExecutor.shutdown(); Assertions.assertAll(eventLoopThread::join); reporterThread.join(); } - private static class FrameConsumer implements Consumer, AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(FrameConsumer.class); - - FrameConsumer() { - LOGGER.info("creating ByteConsumer"); - } - - final AtomicLong atomicLong = new AtomicLong(); - - @Override - public void accept(FrameContext frameServerRX) { - atomicLong.incrementAndGet(); - } - - @Override - public void close() throws Exception { - LOGGER.info("closing ByteConsumer"); - } - } - private static class Reporter implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Reporter.class); final Server server; - final FrameConsumer byteConsumer; final AtomicBoolean stop = new AtomicBoolean(); final long interval = 5000; final ThreadPoolExecutor threadPoolExecutor; + final AtomicLong recordCount; - public Reporter(Server server, FrameConsumer byteConsumer, ThreadPoolExecutor threadPoolExecutor) { + public Reporter(Server server, AtomicLong recordCount, ThreadPoolExecutor threadPoolExecutor) { this.server = server; - this.byteConsumer = byteConsumer; + this.recordCount = recordCount; this.threadPoolExecutor = threadPoolExecutor; } @Override public void run() { while (!stop.get()) { - long start = byteConsumer.atomicLong.get(); + long start = recordCount.get(); try { Thread.sleep(interval); @@ -170,7 +150,7 @@ public void run() { catch (InterruptedException e) { continue; } - long end = byteConsumer.atomicLong.get(); + long end = recordCount.get(); long rate = (end - start) / (interval / 1000);