Skip to content

Commit

Permalink
Trialing Disruptor #6
Browse files Browse the repository at this point in the history
  • Loading branch information
dansiviter committed Mar 7, 2021
1 parent 3715a49 commit 30ff400
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 107 deletions.
37 changes: 18 additions & 19 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,29 @@
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.27</version>
</dependency>

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>uk.dansiviter.logging.JmhMain</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>2.8.0</version>
<configuration>
<from>
<image>openjdk:16-jdk-alpine</image>
</from>
<container>
<mainClass>uk.dansiviter.logging.JmhMain</mainClass>
</container>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class ConsoleHandlerBenchmark {
@State(Scope.Benchmark)
public static class BenchmarkState {
@Param({ "java.util.logging.ConsoleHandler", "uk.dansiviter.logging.AsyncConsoleHandler" })
@Param({ "java.util.logging.ConsoleHandler", "uk.dansiviter.logging.AsyncConsoleHandler", "uk.dansiviter.logging.DisruptorAsyncConsoleHandler" })
public String handlerName;

private Logger log;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2021 Daniel Siviter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.dansiviter.logging;

import java.util.Optional;
import java.util.logging.ConsoleHandler;

/**
* Async implementation of {@link ConsoleHandler} which simply delegates.
*/
public class DisruptorAsyncConsoleHandler extends AsyncStreamHandler {
public DisruptorAsyncConsoleHandler() {
super(Optional.of(new DisruptorEmitter()), new ConsoleHandler());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package uk.dansiviter.logging;

import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.logging.ErrorManager;
import java.util.logging.LogRecord;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;


public class DisruptorEmitter implements Emitter<LogRecord> {
private static final int MAX_DRAIN = 5;
private AsyncHandler handler;
private Disruptor<LogEvent> disruptor;

@Override
public void init(AsyncHandler handler) {
this.handler = handler;
var maxBuffer = handler.property("maxBuffer").map(Integer::parseInt).orElseGet(Flow::defaultBufferSize);
this.disruptor = new Disruptor<>(LogEvent::new, maxBuffer, DaemonThreadFactory.INSTANCE); //, ProducerType.MULTI, new YieldingWaitStrategy());
this.disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
this.handler.doPublish(event.record);
});
this.disruptor.start();
}

@Override
public int submit(LogRecord item) {
this.disruptor.getRingBuffer().publishEvent((e, sequence, r) -> e.record = r, item);
return 1;
}

@Override
public void close() {

try {
for (int i = 0; hasBacklog(this.disruptor.getRingBuffer()) && i < MAX_DRAIN; i++) {
Thread.sleep(100);
}
} catch (InterruptedException e) {
this.handler.getErrorManager().error("Drain interrupted!", e, ErrorManager.CLOSE_FAILURE);
return;
}

try {
disruptor.shutdown(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
this.handler.getErrorManager().error("Shutdown timed out!", e, ErrorManager.CLOSE_FAILURE);
this.disruptor.halt();
}
}

// --- Static Methods ---

private static boolean hasBacklog(RingBuffer<?> buf) {
return !buf.hasAvailableCapacity(buf.getBufferSize());
}

// --- Inner Classes ---

private static class LogEvent {
private LogRecord record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static void main(String[] args) throws InterruptedException {
}

var syncHandler = new ConsoleHandler();
var asyncHandler = new AsyncConsoleHandler();
var asyncHandler = new DisruptorAsyncConsoleHandler();

var log = Logger.getLogger("TEST");

Expand Down
6 changes: 0 additions & 6 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
<artifactId>core</artifactId>

<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
*/
package uk.dansiviter.logging;

import java.util.Optional;
import java.util.logging.ConsoleHandler;
import java.util.logging.LogRecord;

/**
* Async implementation of {@link ConsoleHandler} which simply delegates.
*/
public class AsyncConsoleHandler extends AsyncStreamHandler {
public AsyncConsoleHandler() {
super(new ConsoleHandler());
this(Optional.empty());
}

public AsyncConsoleHandler(Optional<Emitter<LogRecord>> emitter) {
super(emitter, new ConsoleHandler());
}
}
119 changes: 41 additions & 78 deletions core/src/main/java/uk/dansiviter/logging/AsyncHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.UnsupportedEncodingException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.ErrorManager;
import java.util.logging.Filter;
Expand All @@ -33,72 +32,68 @@

import javax.annotation.Nonnull;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

/**
* An abstract {@link Handler} that asynchronously delivers log messages. This
* leverages {@link java.util.concurrent.Flow} to handle log events.
* Back-pressure is handled by blocking the calling thread if the buffer is
* full. Therefore, to avoid a significant thread hang ensure the processing is
* done in a timely manner.
* An abstract {@link Handler} that asynchronously delivers log messages. This leverages
* {@link java.util.concurrent.Flow} to handle log events. Back-pressure is handled by blocking the
* calling thread if the buffer is full. Therefore, to avoid a significant thread hang ensure the processing is done in
* a timely manner.
* </p>
* <b>Configuration:</b> Using the following {@code LogManager} configuration
* properties, where {@code <handler-name>} refers to the fully-qualified class
* name of the handler:
* <b>Configuration:</b>
* Using the following {@code LogManager} configuration properties, where {@code <handler-name>} refers to the
* fully-qualified class name of the handler:
* <ul>
* <li>{@code &lt;handler-name&gt;.level} specifies the default level for the
* {@code Handler} (defaults to {@code INFO}).</li>
* <li>{@code &lt;handler-name&gt;.filter} specifies the name of a
* {@code Filter} class to use (defaults to no {@code Filter}).</li>
* <li>{@code &lt;handler-name&gt;.formatter} specifies the name of a
* {@code Formatter} class to use (defaults to
* {@link java.util.logging.SimpleFormatter}).</li>
* <li>{@code &lt;handler-name&gt;.encoding} the name of the character set
* encoding to use (defaults to the default platform encoding).</li>
* <li>{@code &lt;handler-name&gt;.maxBuffer} specifies the maximum buffer size
* level for the handler (defaults to
* {@link java.util.concurrent.Flow#defaultBufferSize()}).</li>
* <li> {@code &lt;handler-name&gt;.level}
* specifies the default level for the {@code Handler}
* (defaults to {@code INFO}). </li>
* <li> {@code &lt;handler-name&gt;.filter}
* specifies the name of a {@code Filter} class to use
* (defaults to no {@code Filter}). </li>
* <li> {@code &lt;handler-name&gt;.formatter}
* specifies the name of a {@code Formatter} class to use
* (defaults to {@link java.util.logging.SimpleFormatter}). </li>
* <li> {@code &lt;handler-name&gt;.encoding}
* the name of the character set encoding to use (defaults to
* the default platform encoding). </li>
* <li> {@code &lt;handler-name&gt;.maxBuffer}
* specifies the maximum buffer size level for the handler
* (defaults to {@link java.util.concurrent.Flow#defaultBufferSize()}). </li>
* </ul>
*/
public abstract class AsyncHandler extends Handler {
private static final int MAX_DRAIN = 5;
private final Disruptor<LogEvent> disruptor;

protected final AtomicBoolean closed = new AtomicBoolean();

public AsyncHandler() {
var manager = Objects.requireNonNull(LogManager.getLogManager());
private final LogManager manager;
private final Emitter<LogRecord> emitter;


setLevel(property(manager, "level").map(Level::parse).orElse(Level.INFO));
setFilter(property(manager, "filter").map(AsyncHandler::<Filter>instance).orElse(null));
setFormatter(property(manager, "formatter").map(AsyncHandler::<Formatter>instance).orElseGet(SimpleFormatter::new));
public AsyncHandler(Optional<Emitter<LogRecord>> emitter) {
manager = Objects.requireNonNull(LogManager.getLogManager());

setLevel(property("level").map(Level::parse).orElse(Level.INFO));
setFilter(property("filter").map(AsyncHandler::<Filter>instance).orElse(null));
setFormatter(property("formatter").map(AsyncHandler::<Formatter>instance).orElseGet(SimpleFormatter::new));
try {
setEncoding(property(manager, "encoding").orElse(null));
setEncoding(property("encoding").orElse(null));
} catch (UnsupportedEncodingException e) {
getErrorManager().error(e.getMessage(), e, ErrorManager.OPEN_FAILURE);
}

var maxBuffer = property(manager, "maxBuffer").map(Integer::parseInt).orElse(1024);

this.disruptor = new Disruptor<>(LogEvent::new, maxBuffer, DaemonThreadFactory.INSTANCE); //, ProducerType.MULTI, new YieldingWaitStrategy());
this.disruptor.handleEventsWith((event, sequence, endOfBatch) -> doPublish(event.record));
this.disruptor.start();
this.emitter = emitter.orElseGet(() -> new DefaultEmitter());
this.emitter.init(this);
}

/**
* Extracts the {@link LogManager#getProperty(String)}.
*
* @param manager the manager instance.
* @param name the name of the property.
* @param name the name of the property.
* @return the value as an {@link Optional}.
*/
protected Optional<String> property(@Nonnull LogManager manager, @Nonnull String name) {
Optional<String> property(@Nonnull String name) {
return Optional.ofNullable(manager.getProperty(getClass().getName() + "." + name));
}


// --- Static Methods ---

@SuppressWarnings("unchecked")
Expand All @@ -113,17 +108,13 @@ protected Optional<String> property(@Nonnull LogManager manager, @Nonnull String

@Override
public void publish(LogRecord record) {
if (this.closed.get()) {
throw new IllegalStateException("Handler closed!");
}

if (!isLoggable(record)) {
return;
}

record.getSourceClassName(); // ensure source is populated
record.getSourceClassName(); // ensure source is populated

this.disruptor.getRingBuffer().publishEvent((e, sequence, r) -> e.record = r, record);
this.emitter.submit(record);
}

/**
Expand All @@ -134,41 +125,13 @@ public void publish(LogRecord record) {
protected abstract void doPublish(LogRecord record);

@Override
public void flush() {
}
public void flush() { }

@Override
public void close() throws SecurityException {
if (!closed.compareAndSet(false, true)) {
throw new IllegalStateException("Already closed!");
}

try {
for (int i = 0; hasBacklog(this.disruptor.getRingBuffer()) && i < MAX_DRAIN; i++) {
Thread.sleep(100);
}
} catch (InterruptedException e) {
getErrorManager().error("Drain interrupted!", e, ErrorManager.CLOSE_FAILURE);
return;
}

try {
disruptor.shutdown(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
getErrorManager().error("Shutdown timed out!", e, ErrorManager.CLOSE_FAILURE);
this.disruptor.halt();
}
}

// --- Static Methods ---

private static boolean hasBacklog(RingBuffer<?> buf) {
return !buf.hasAvailableCapacity(buf.getBufferSize());
}

// --- Inner Classes ---

private static class LogEvent {
private LogRecord record;
this.emitter.close();
}
}
Loading

0 comments on commit 30ff400

Please sign in to comment.