From 30ff400614b37abcd645f063d6e29778ee7fba77 Mon Sep 17 00:00:00 2001
From: Daniel Siviter
Date: Sun, 7 Mar 2021 19:45:15 +0000
Subject: [PATCH] Trialing Disruptor #6
---
benchmarks/pom.xml | 37 +++---
.../logging/ConsoleHandlerBenchmark.java | 2 +-
.../logging/DisruptorAsyncConsoleHandler.java | 28 +++++
.../dansiviter/logging/DisruptorEmitter.java | 67 ++++++++++
.../uk/dansiviter/logging/RoughBenchmark.java | 2 +-
core/pom.xml | 6 -
.../logging/AsyncConsoleHandler.java | 8 +-
.../uk/dansiviter/logging/AsyncHandler.java | 119 ++++++------------
.../logging/AsyncStreamHandler.java | 7 +-
.../uk/dansiviter/logging/DefaultEmitter.java | 67 ++++++++++
.../java/uk/dansiviter/logging/Emitter.java | 10 ++
.../dansiviter/logging/AsyncHandlerTest.java | 5 +
12 files changed, 251 insertions(+), 107 deletions(-)
create mode 100644 benchmarks/src/main/java/uk/dansiviter/logging/DisruptorAsyncConsoleHandler.java
create mode 100644 benchmarks/src/main/java/uk/dansiviter/logging/DisruptorEmitter.java
create mode 100644 core/src/main/java/uk/dansiviter/logging/DefaultEmitter.java
create mode 100644 core/src/main/java/uk/dansiviter/logging/Emitter.java
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 43ab11a..596785b 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -32,30 +32,29 @@
jmh-generator-annprocess
1.27
+
+
+ com.lmax
+ disruptor
+ 3.4.2
+
- org.apache.maven.plugins
- maven-shade-plugin
- 3.2.4
-
-
- package
-
- shade
-
-
-
-
- uk.dansiviter.logging.JmhMain
-
-
-
-
-
-
+ com.google.cloud.tools
+ jib-maven-plugin
+ 2.8.0
+
+
+ openjdk:16-jdk-alpine
+
+
+ uk.dansiviter.logging.JmhMain
+
+
+
diff --git a/benchmarks/src/main/java/uk/dansiviter/logging/ConsoleHandlerBenchmark.java b/benchmarks/src/main/java/uk/dansiviter/logging/ConsoleHandlerBenchmark.java
index c2c4440..c1c231e 100644
--- a/benchmarks/src/main/java/uk/dansiviter/logging/ConsoleHandlerBenchmark.java
+++ b/benchmarks/src/main/java/uk/dansiviter/logging/ConsoleHandlerBenchmark.java
@@ -33,7 +33,7 @@
public class ConsoleHandlerBenchmark {
@State(Scope.Benchmark)
public static class BenchmarkState {
- @Param({ "java.util.logging.ConsoleHandler", "uk.dansiviter.logging.AsyncConsoleHandler" })
+ @Param({ "java.util.logging.ConsoleHandler", "uk.dansiviter.logging.AsyncConsoleHandler", "uk.dansiviter.logging.DisruptorAsyncConsoleHandler" })
public String handlerName;
private Logger log;
diff --git a/benchmarks/src/main/java/uk/dansiviter/logging/DisruptorAsyncConsoleHandler.java b/benchmarks/src/main/java/uk/dansiviter/logging/DisruptorAsyncConsoleHandler.java
new file mode 100644
index 0000000..72b645b
--- /dev/null
+++ b/benchmarks/src/main/java/uk/dansiviter/logging/DisruptorAsyncConsoleHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2021 Daniel Siviter
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.dansiviter.logging;
+
+import java.util.Optional;
+import java.util.logging.ConsoleHandler;
+
+/**
+ * Async implementation of {@link ConsoleHandler} which simply delegates.
+ */
+public class DisruptorAsyncConsoleHandler extends AsyncStreamHandler {
+ public DisruptorAsyncConsoleHandler() {
+ super(Optional.of(new DisruptorEmitter()), new ConsoleHandler());
+ }
+}
diff --git a/benchmarks/src/main/java/uk/dansiviter/logging/DisruptorEmitter.java b/benchmarks/src/main/java/uk/dansiviter/logging/DisruptorEmitter.java
new file mode 100644
index 0000000..ae4677f
--- /dev/null
+++ b/benchmarks/src/main/java/uk/dansiviter/logging/DisruptorEmitter.java
@@ -0,0 +1,67 @@
+package uk.dansiviter.logging;
+
+import java.util.concurrent.Flow;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.ErrorManager;
+import java.util.logging.LogRecord;
+
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+
+public class DisruptorEmitter implements Emitter {
+ private static final int MAX_DRAIN = 5;
+ private AsyncHandler handler;
+ private Disruptor disruptor;
+
+ @Override
+ public void init(AsyncHandler handler) {
+ this.handler = handler;
+ var maxBuffer = handler.property("maxBuffer").map(Integer::parseInt).orElseGet(Flow::defaultBufferSize);
+ this.disruptor = new Disruptor<>(LogEvent::new, maxBuffer, DaemonThreadFactory.INSTANCE); //, ProducerType.MULTI, new YieldingWaitStrategy());
+ this.disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
+ this.handler.doPublish(event.record);
+ });
+ this.disruptor.start();
+ }
+
+ @Override
+ public int submit(LogRecord item) {
+ this.disruptor.getRingBuffer().publishEvent((e, sequence, r) -> e.record = r, item);
+ return 1;
+ }
+
+ @Override
+ public void close() {
+
+ try {
+ for (int i = 0; hasBacklog(this.disruptor.getRingBuffer()) && i < MAX_DRAIN; i++) {
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException e) {
+ this.handler.getErrorManager().error("Drain interrupted!", e, ErrorManager.CLOSE_FAILURE);
+ return;
+ }
+
+ try {
+ disruptor.shutdown(10, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ this.handler.getErrorManager().error("Shutdown timed out!", e, ErrorManager.CLOSE_FAILURE);
+ this.disruptor.halt();
+ }
+ }
+
+ // --- Static Methods ---
+
+ private static boolean hasBacklog(RingBuffer> buf) {
+ return !buf.hasAvailableCapacity(buf.getBufferSize());
+ }
+
+ // --- Inner Classes ---
+
+ private static class LogEvent {
+ private LogRecord record;
+ }
+}
diff --git a/benchmarks/src/main/java/uk/dansiviter/logging/RoughBenchmark.java b/benchmarks/src/main/java/uk/dansiviter/logging/RoughBenchmark.java
index 7aa44e5..cab7f35 100644
--- a/benchmarks/src/main/java/uk/dansiviter/logging/RoughBenchmark.java
+++ b/benchmarks/src/main/java/uk/dansiviter/logging/RoughBenchmark.java
@@ -34,7 +34,7 @@ public static void main(String[] args) throws InterruptedException {
}
var syncHandler = new ConsoleHandler();
- var asyncHandler = new AsyncConsoleHandler();
+ var asyncHandler = new DisruptorAsyncConsoleHandler();
var log = Logger.getLogger("TEST");
diff --git a/core/pom.xml b/core/pom.xml
index 8f5d190..5ccec0d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -9,12 +9,6 @@
core
-
- com.lmax
- disruptor
- 3.4.2
-
-
org.awaitility
awaitility
diff --git a/core/src/main/java/uk/dansiviter/logging/AsyncConsoleHandler.java b/core/src/main/java/uk/dansiviter/logging/AsyncConsoleHandler.java
index d634cfa..8a55723 100644
--- a/core/src/main/java/uk/dansiviter/logging/AsyncConsoleHandler.java
+++ b/core/src/main/java/uk/dansiviter/logging/AsyncConsoleHandler.java
@@ -15,13 +15,19 @@
*/
package uk.dansiviter.logging;
+import java.util.Optional;
import java.util.logging.ConsoleHandler;
+import java.util.logging.LogRecord;
/**
* Async implementation of {@link ConsoleHandler} which simply delegates.
*/
public class AsyncConsoleHandler extends AsyncStreamHandler {
public AsyncConsoleHandler() {
- super(new ConsoleHandler());
+ this(Optional.empty());
+ }
+
+ public AsyncConsoleHandler(Optional> emitter) {
+ super(emitter, new ConsoleHandler());
}
}
diff --git a/core/src/main/java/uk/dansiviter/logging/AsyncHandler.java b/core/src/main/java/uk/dansiviter/logging/AsyncHandler.java
index 2afa7af..a4931fa 100644
--- a/core/src/main/java/uk/dansiviter/logging/AsyncHandler.java
+++ b/core/src/main/java/uk/dansiviter/logging/AsyncHandler.java
@@ -20,7 +20,6 @@
import java.io.UnsupportedEncodingException;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.ErrorManager;
import java.util.logging.Filter;
@@ -33,72 +32,68 @@
import javax.annotation.Nonnull;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.util.DaemonThreadFactory;
-
/**
- * An abstract {@link Handler} that asynchronously delivers log messages. This
- * leverages {@link java.util.concurrent.Flow} to handle log events.
- * Back-pressure is handled by blocking the calling thread if the buffer is
- * full. Therefore, to avoid a significant thread hang ensure the processing is
- * done in a timely manner.
+ * An abstract {@link Handler} that asynchronously delivers log messages. This leverages
+ * {@link java.util.concurrent.Flow} to handle log events. Back-pressure is handled by blocking the
+ * calling thread if the buffer is full. Therefore, to avoid a significant thread hang ensure the processing is done in
+ * a timely manner.
*
- * Configuration: Using the following {@code LogManager} configuration
- * properties, where {@code } refers to the fully-qualified class
- * name of the handler:
+ * Configuration:
+ * Using the following {@code LogManager} configuration properties, where {@code } refers to the
+ * fully-qualified class name of the handler:
*
- * - {@code <handler-name>.level} specifies the default level for the
- * {@code Handler} (defaults to {@code INFO}).
- * - {@code <handler-name>.filter} specifies the name of a
- * {@code Filter} class to use (defaults to no {@code Filter}).
- * - {@code <handler-name>.formatter} specifies the name of a
- * {@code Formatter} class to use (defaults to
- * {@link java.util.logging.SimpleFormatter}).
- * - {@code <handler-name>.encoding} the name of the character set
- * encoding to use (defaults to the default platform encoding).
- * - {@code <handler-name>.maxBuffer} specifies the maximum buffer size
- * level for the handler (defaults to
- * {@link java.util.concurrent.Flow#defaultBufferSize()}).
+ * - {@code <handler-name>.level}
+ * specifies the default level for the {@code Handler}
+ * (defaults to {@code INFO}).
+ * - {@code <handler-name>.filter}
+ * specifies the name of a {@code Filter} class to use
+ * (defaults to no {@code Filter}).
+ * - {@code <handler-name>.formatter}
+ * specifies the name of a {@code Formatter} class to use
+ * (defaults to {@link java.util.logging.SimpleFormatter}).
+ * - {@code <handler-name>.encoding}
+ * the name of the character set encoding to use (defaults to
+ * the default platform encoding).
+ * - {@code <handler-name>.maxBuffer}
+ * specifies the maximum buffer size level for the handler
+ * (defaults to {@link java.util.concurrent.Flow#defaultBufferSize()}).
*
*/
public abstract class AsyncHandler extends Handler {
- private static final int MAX_DRAIN = 5;
- private final Disruptor disruptor;
-
protected final AtomicBoolean closed = new AtomicBoolean();
- public AsyncHandler() {
- var manager = Objects.requireNonNull(LogManager.getLogManager());
+ private final LogManager manager;
+ private final Emitter emitter;
+
- setLevel(property(manager, "level").map(Level::parse).orElse(Level.INFO));
- setFilter(property(manager, "filter").map(AsyncHandler::instance).orElse(null));
- setFormatter(property(manager, "formatter").map(AsyncHandler::instance).orElseGet(SimpleFormatter::new));
+ public AsyncHandler(Optional> emitter) {
+ manager = Objects.requireNonNull(LogManager.getLogManager());
+
+ setLevel(property("level").map(Level::parse).orElse(Level.INFO));
+ setFilter(property("filter").map(AsyncHandler::instance).orElse(null));
+ setFormatter(property("formatter").map(AsyncHandler::instance).orElseGet(SimpleFormatter::new));
try {
- setEncoding(property(manager, "encoding").orElse(null));
+ setEncoding(property("encoding").orElse(null));
} catch (UnsupportedEncodingException e) {
getErrorManager().error(e.getMessage(), e, ErrorManager.OPEN_FAILURE);
}
- var maxBuffer = property(manager, "maxBuffer").map(Integer::parseInt).orElse(1024);
-
- this.disruptor = new Disruptor<>(LogEvent::new, maxBuffer, DaemonThreadFactory.INSTANCE); //, ProducerType.MULTI, new YieldingWaitStrategy());
- this.disruptor.handleEventsWith((event, sequence, endOfBatch) -> doPublish(event.record));
- this.disruptor.start();
+ this.emitter = emitter.orElseGet(() -> new DefaultEmitter());
+ this.emitter.init(this);
}
/**
* Extracts the {@link LogManager#getProperty(String)}.
*
* @param manager the manager instance.
- * @param name the name of the property.
+ * @param name the name of the property.
* @return the value as an {@link Optional}.
*/
- protected Optional property(@Nonnull LogManager manager, @Nonnull String name) {
+ Optional property(@Nonnull String name) {
return Optional.ofNullable(manager.getProperty(getClass().getName() + "." + name));
}
+
// --- Static Methods ---
@SuppressWarnings("unchecked")
@@ -113,17 +108,13 @@ protected Optional property(@Nonnull LogManager manager, @Nonnull String
@Override
public void publish(LogRecord record) {
- if (this.closed.get()) {
- throw new IllegalStateException("Handler closed!");
- }
-
if (!isLoggable(record)) {
return;
}
- record.getSourceClassName(); // ensure source is populated
+ record.getSourceClassName(); // ensure source is populated
- this.disruptor.getRingBuffer().publishEvent((e, sequence, r) -> e.record = r, record);
+ this.emitter.submit(record);
}
/**
@@ -134,41 +125,13 @@ public void publish(LogRecord record) {
protected abstract void doPublish(LogRecord record);
@Override
- public void flush() {
- }
+ public void flush() { }
@Override
public void close() throws SecurityException {
if (!closed.compareAndSet(false, true)) {
throw new IllegalStateException("Already closed!");
}
-
- try {
- for (int i = 0; hasBacklog(this.disruptor.getRingBuffer()) && i < MAX_DRAIN; i++) {
- Thread.sleep(100);
- }
- } catch (InterruptedException e) {
- getErrorManager().error("Drain interrupted!", e, ErrorManager.CLOSE_FAILURE);
- return;
- }
-
- try {
- disruptor.shutdown(10, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- getErrorManager().error("Shutdown timed out!", e, ErrorManager.CLOSE_FAILURE);
- this.disruptor.halt();
- }
- }
-
- // --- Static Methods ---
-
- private static boolean hasBacklog(RingBuffer> buf) {
- return !buf.hasAvailableCapacity(buf.getBufferSize());
- }
-
- // --- Inner Classes ---
-
- private static class LogEvent {
- private LogRecord record;
+ this.emitter.close();
}
}
diff --git a/core/src/main/java/uk/dansiviter/logging/AsyncStreamHandler.java b/core/src/main/java/uk/dansiviter/logging/AsyncStreamHandler.java
index fdeb2fc..3f62f0a 100644
--- a/core/src/main/java/uk/dansiviter/logging/AsyncStreamHandler.java
+++ b/core/src/main/java/uk/dansiviter/logging/AsyncStreamHandler.java
@@ -17,6 +17,7 @@
import java.io.UnsupportedEncodingException;
import java.util.Objects;
+import java.util.Optional;
import java.util.logging.ErrorManager;
import java.util.logging.LogRecord;
import java.util.logging.StreamHandler;
@@ -29,7 +30,11 @@
public abstract class AsyncStreamHandler extends AsyncHandler {
protected final StreamHandler delegate;
- public AsyncStreamHandler(@Nonnull StreamHandler delegate) {
+ public AsyncStreamHandler(
+ Optional> emitter,
+ @Nonnull StreamHandler delegate)
+ {
+ super(emitter);
this.delegate = Objects.requireNonNull(delegate);
this.delegate.setLevel(getLevel());
this.delegate.setFilter(getFilter());
diff --git a/core/src/main/java/uk/dansiviter/logging/DefaultEmitter.java b/core/src/main/java/uk/dansiviter/logging/DefaultEmitter.java
new file mode 100644
index 0000000..5f071d1
--- /dev/null
+++ b/core/src/main/java/uk/dansiviter/logging/DefaultEmitter.java
@@ -0,0 +1,67 @@
+package uk.dansiviter.logging;
+
+import static java.util.concurrent.ForkJoinPool.commonPool;
+
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.logging.ErrorManager;
+import java.util.logging.LogRecord;
+import java.util.concurrent.SubmissionPublisher;
+
+
+public class DefaultEmitter implements Emitter {
+ private final Subscriber subscriber = new LogSubscriber();
+
+ private AsyncHandler handler;
+ private SubmissionPublisher delegate;
+
+ @Override
+ public void init(AsyncHandler handler) {
+ this.handler = handler;
+ var maxBuffer = handler.property("maxBuffer").map(Integer::parseInt).orElseGet(Flow::defaultBufferSize);
+ this.delegate = new SubmissionPublisher<>(commonPool(), maxBuffer);
+ this.delegate.subscribe(this.subscriber);
+ }
+
+ @Override
+ public int submit(LogRecord item) {
+ return this.delegate.submit(item);
+ }
+
+ @Override
+ public void close() {
+ this.delegate.close();
+ }
+
+ // --- Inner Classes ---
+
+ /**
+ *
+ */
+ private class LogSubscriber implements Subscriber {
+ private Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ this.subscription.request(1);
+ }
+
+ @Override
+ public void onNext(LogRecord item) {
+ handler.doPublish(item);
+ this.subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ handler.getErrorManager().error(t.getMessage(), new Exception(t), ErrorManager.GENERIC_FAILURE);
+ }
+
+ @Override
+ public void onComplete() {
+ // Nothing to see here
+ }
+ }
+}
diff --git a/core/src/main/java/uk/dansiviter/logging/Emitter.java b/core/src/main/java/uk/dansiviter/logging/Emitter.java
new file mode 100644
index 0000000..8f7590c
--- /dev/null
+++ b/core/src/main/java/uk/dansiviter/logging/Emitter.java
@@ -0,0 +1,10 @@
+package uk.dansiviter.logging;
+
+public interface Emitter extends AutoCloseable {
+ void init(AsyncHandler handler);
+
+ int submit(T item);
+
+ @Override
+ void close();
+}
diff --git a/core/src/test/java/uk/dansiviter/logging/AsyncHandlerTest.java b/core/src/test/java/uk/dansiviter/logging/AsyncHandlerTest.java
index 96af4cf..d690fc1 100644
--- a/core/src/test/java/uk/dansiviter/logging/AsyncHandlerTest.java
+++ b/core/src/test/java/uk/dansiviter/logging/AsyncHandlerTest.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
import java.util.logging.LogRecord;
@@ -81,6 +82,10 @@ public void after() {
private static class TestHandler extends AsyncHandler {
private final List records = new ArrayList<>();
+ public TestHandler() {
+ super(Optional.empty());
+ }
+
@Override
protected void doPublish(LogRecord record) {
records.add(record);