Skip to content

Commit

Permalink
Merge pull request #5330 from eclipse-vertx/inbound-message-queue-han…
Browse files Browse the repository at this point in the history
…dling

Various JPMS related changes
  • Loading branch information
vietj authored Sep 23, 2024
2 parents e75a0a1 + cf49c9c commit 6947a7c
Show file tree
Hide file tree
Showing 19 changed files with 94 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.ref.Cleaner;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* A lightweight proxy of Vert.x {@link HttpClient} that can be collected by the garbage collector and release
Expand Down Expand Up @@ -99,6 +100,11 @@ public Metrics getMetrics() {
return delegate.getMetrics();
}

@Override
public Function<HttpClientResponse, Future<RequestOptions>> redirectHandler() {
return delegate.redirectHandler();
}

@Override
public NetClientInternal netClient() {
return delegate.netClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ private static class StreamImpl extends Stream implements HttpClientStream {
super(context, promise, id);

this.conn = conn;
this.queue = new InboundMessageQueue<>(conn.context.nettyEventLoop(), context) {
this.queue = new InboundMessageQueue<>(conn.context.eventLoop(), context.executor()) {
@Override
protected void handleResume() {
conn.doResume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class Http1xServerRequest extends HttpServerRequestInternal implements io
this.conn = conn;
this.context = context;
this.request = request;
this.queue = new InboundMessageQueue<>(context.nettyEventLoop(), context) {
this.queue = new InboundMessageQueue<>(context.eventLoop(), context.executor()) {
@Override
protected void handleMessage(Object elt) {
if (elt == InboundBuffer.END_SENTINEL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
this.conn = conn;
this.vertx = conn.vertx();
this.context = context;
this.inboundQueue = new InboundMessageQueue<>(conn.channel().eventLoop(), context) {
this.inboundQueue = new InboundMessageQueue<>(conn.context().eventLoop(), context.executor()) {
@Override
protected void handleMessage(Object item) {
if (item instanceof MultiMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public abstract class WebSocketImplBase<S extends WebSocket> implements WebSocke
this.context = context;
this.maxWebSocketFrameSize = maxWebSocketFrameSize;
this.maxWebSocketMessageSize = maxWebSocketMessageSize;
this.pending = new InboundMessageQueue<>(context.nettyEventLoop(), context) {
this.pending = new InboundMessageQueue<>(context.eventLoop(), context.executor()) {
@Override
protected void handleResume() {
conn.doResume();
Expand Down
3 changes: 0 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/ContextBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ abstract class ContextBase implements ContextInternal {
this.locals = locals;
}

@Override
public abstract EventExecutor executor();

public ContextInternal beginDispatch() {
VertxImpl vertx = (VertxImpl) owner();
return vertx.beginDispatch(this);
Expand Down
11 changes: 8 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {
private final DeploymentContext deployment;
private final CloseFuture closeFuture;
private final ClassLoader tccl;
private final EventLoop eventLoop;
private final EventLoopExecutor eventLoop;
private final ThreadingModel threadingModel;
private final EventExecutor executor;
private ConcurrentMap<Object, Object> data;
Expand All @@ -53,7 +53,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {

public ContextImpl(VertxInternal vertx,
Object[] locals,
EventLoop eventLoop,
EventLoopExecutor eventLoop,
ThreadingModel threadingModel,
EventExecutor executor,
WorkerPool workerPool,
Expand Down Expand Up @@ -96,7 +96,7 @@ public JsonObject config() {
}

public EventLoop nettyEventLoop() {
return eventLoop;
return eventLoop.eventLoop;
}

public VertxInternal owner() {
Expand All @@ -108,6 +108,11 @@ public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean or
return workerPool.executeBlocking(this, blockingCodeHandler, ordered ? orderedTasks : null);
}

@Override
public EventExecutor eventLoop() {
return eventLoop;
}

@Override
public EventExecutor executor() {
return executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public Context exceptionHandler(Handler<Throwable> handler) {
return this;
}

@Override
public EventExecutor eventLoop() {
return delegate.eventLoop();
}

@Override
public EventExecutor executor() {
return delegate.executor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class EventLoopExecutor implements EventExecutor {
public final class EventLoopExecutor implements EventExecutor {

private final EventLoop eventLoop;
final EventLoop eventLoop;

public EventLoopExecutor(EventLoop eventLoop) {
this.eventLoop = eventLoop;
}

public EventLoop eventLoop() {
return eventLoop;
}

@Override
public boolean inThread() {
return eventLoop.inEventLoop();
Expand Down
11 changes: 8 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/ShadowContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,30 @@ final class ShadowContext extends ContextBase {

final VertxInternal owner;
final ContextBase delegate;
private final EventLoop eventLoop;
private final EventLoopExecutor eventLoop;
private final TaskQueue orderedTasks;

public ShadowContext(VertxInternal owner, EventLoop eventLoop, ContextInternal delegate) {
public ShadowContext(VertxInternal owner, EventLoopExecutor eventLoop, ContextInternal delegate) {
super(((ContextBase)delegate).locals);
this.owner = owner;
this.eventLoop = eventLoop;
this.delegate = (ContextBase) delegate;
this.orderedTasks = new TaskQueue();
}

@Override
public EventExecutor eventLoop() {
return eventLoop;
}

@Override
public EventExecutor executor() {
return delegate.executor();
}

@Override
public EventLoop nettyEventLoop() {
return eventLoop;
return eventLoop.eventLoop;
}

@Override
Expand Down
10 changes: 6 additions & 4 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ private ContextInternal createContext(Thread thread) {
} else {
ContextInternal ctx;
EventLoop eventLoop = stickyEventLoop();
EventLoopExecutor eventLoopExecutor = new EventLoopExecutor(eventLoop);
EventExecutor eventExecutor = null;
if (eventExecutorProvider != null) {
java.util.concurrent.Executor executor = eventExecutorProvider.eventExecutorFor(thread);
Expand All @@ -516,7 +517,7 @@ public void execute(Runnable command) {
}
}
if (eventExecutor != null) {
ctx = new ContextImpl(this, createContextLocals(), eventLoop, ThreadingModel.OTHER, eventExecutor, workerPool, new TaskQueue(), null, closeFuture, Thread.currentThread().getContextClassLoader());
ctx = new ContextImpl(this, createContextLocals(), eventLoopExecutor, ThreadingModel.OTHER, eventExecutor, workerPool, new TaskQueue(), null, closeFuture, Thread.currentThread().getContextClassLoader());
} else {
ctx = createEventLoopContext(eventLoop, workerPool, Thread.currentThread().getContextClassLoader());
}
Expand Down Expand Up @@ -578,12 +579,13 @@ public ContextImpl createContext(ThreadingModel threadingModel,
DeploymentContext deployment,
ClassLoader tccl) {
EventExecutor eventExecutor;
EventLoopExecutor eventLoopExecutor = new EventLoopExecutor(eventLoop);
TaskQueue orderedTasks = new TaskQueue();
WorkerPool wp;
switch (threadingModel) {
case EVENT_LOOP:
wp = workerPool != null ? workerPool : this.workerPool;
eventExecutor = new EventLoopExecutor(eventLoop);
eventExecutor = eventLoopExecutor;
break;
case WORKER:
wp = workerPool != null ? workerPool : this.workerPool;
Expand All @@ -601,7 +603,7 @@ public ContextImpl createContext(ThreadingModel threadingModel,
}
return new ContextImpl(this,
createContextLocals(),
eventLoop,
eventLoopExecutor,
threadingModel,
eventExecutor,
wp,
Expand Down Expand Up @@ -693,7 +695,7 @@ private ContextInternal getContext(Thread thread) {
}
} else {
EventLoop eventLoop = stickyEventLoop();
return new ShadowContext(this, eventLoop, context);
return new ShadowContext(this, new EventLoopExecutor(eventLoop), context);
}
} else {
WeakReference<ContextInternal> ref = stickyContext.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ default void runOnContext(Handler<Void> action) {
*/
EventExecutor executor();

/**
* @return the event loop executor of this context
*/
EventExecutor eventLoop();

/**
* Return the Netty EventLoop used by this Context. This can be used to integrate
* a Netty Server with a Vert.x runtime, specially the Context part.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
*/
package io.vertx.core.internal.concurrent;

import io.netty.channel.EventLoop;
import io.vertx.core.ThreadingModel;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.impl.EventLoopExecutor;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.streams.impl.InboundReadQueue;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -25,8 +24,8 @@ public class InboundMessageQueue<M> implements Predicate<M>, Runnable {

private static final AtomicLongFieldUpdater<InboundMessageQueue<?>> DEMAND_UPDATER = (AtomicLongFieldUpdater<InboundMessageQueue<?>>) (AtomicLongFieldUpdater)AtomicLongFieldUpdater.newUpdater(InboundMessageQueue.class, "demand");

private final ContextInternal context;
private final EventLoop eventLoop;
private final EventExecutor consumer;
private final EventExecutor producer;
private final InboundReadQueue<M> readQueue;

// Accessed by context thread
Expand All @@ -36,28 +35,34 @@ public class InboundMessageQueue<M> implements Predicate<M>, Runnable {
// Any thread
private volatile long demand = Long.MAX_VALUE;

public InboundMessageQueue(EventLoop eventLoop, ContextInternal context) {
public InboundMessageQueue(EventExecutor producer, EventExecutor consumer) {
InboundReadQueue.Factory readQueueFactory;
if (context.threadingModel() == ThreadingModel.EVENT_LOOP && context.nettyEventLoop() == eventLoop) {
if (consumer instanceof EventLoopExecutor && producer instanceof EventLoopExecutor && ((EventLoopExecutor)consumer).eventLoop() == ((EventLoopExecutor)producer).eventLoop()) {
readQueueFactory = InboundReadQueue.SINGLE_THREADED;
} else {
readQueueFactory = InboundReadQueue.SPSC;
}
this.readQueue = readQueueFactory.create(this);
this.context = context;
this.eventLoop = eventLoop;
this.consumer = consumer;
this.producer = producer;
}

public InboundMessageQueue(EventLoop eventLoop, ContextInternal context, int lowWaterMark, int highWaterMark) {
public InboundMessageQueue(EventExecutor producer, EventExecutor consumer, InboundReadQueue.Factory readQueueFactory) {
this.readQueue = readQueueFactory.create(this);
this.consumer = consumer;
this.producer = producer;
}

public InboundMessageQueue(EventExecutor producer, EventExecutor consumer, int lowWaterMark, int highWaterMark) {
InboundReadQueue.Factory readQueueFactory;
if (context.threadingModel() == ThreadingModel.EVENT_LOOP && context.nettyEventLoop() == eventLoop) {
if (consumer instanceof EventLoopExecutor && producer instanceof EventLoopExecutor && ((EventLoopExecutor)consumer).eventLoop() == ((EventLoopExecutor)producer).eventLoop()) {
readQueueFactory = InboundReadQueue.SINGLE_THREADED;
} else {
readQueueFactory = InboundReadQueue.SPSC;
}
this.readQueue = readQueueFactory.create(this, lowWaterMark, highWaterMark);
this.context = context;
this.eventLoop = eventLoop;
this.consumer = consumer;
this.producer = consumer;
}

@Override
Expand Down Expand Up @@ -101,7 +106,7 @@ protected void handleMessage(M msg) {
* @return {@code true} when a {@link #drain()} should be called.
*/
public final boolean add(M msg) {
assert eventLoop.inEventLoop();
assert producer.inThread();
int res = readQueue.add(msg);
if ((res & InboundReadQueue.QUEUE_UNWRITABLE_MASK) != 0) {
handlePause();
Expand Down Expand Up @@ -139,11 +144,11 @@ public final void write(M msg) {
* Schedule a drain operation on the context thread.
*/
public final void drain() {
assert eventLoop.inEventLoop();
if (context.inThread()) {
assert producer.inThread();
if (consumer.inThread()) {
drainInternal();
} else {
context.execute(this::drainInternal);
consumer.execute(this::drainInternal);
}
}

Expand All @@ -152,7 +157,7 @@ public final void drain() {
*/
@Override
public void run() {
assert context.inThread();
assert consumer.inThread();
if (!draining && needsDrain) {
drainInternal();
}
Expand All @@ -164,7 +169,7 @@ private void drainInternal() {
int res = readQueue.drain();
needsDrain = (res & InboundReadQueue.DRAIN_REQUIRED_MASK) != 0;
if ((res & InboundReadQueue.QUEUE_WRITABLE_MASK) != 0) {
eventLoop.execute(this::handleResume);
producer.execute(this::handleResume);
}
} finally {
draining = false;
Expand Down Expand Up @@ -198,8 +203,7 @@ public final void fetch(long amount) {
break;
}
}
context
.executor()
consumer
.execute(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.vertx.core.internal.net.NetClientInternal;
import io.vertx.core.spi.metrics.MetricsProvider;

import java.util.function.Function;

/**
* Http client internal API.
*/
Expand All @@ -28,6 +30,8 @@ public interface HttpClientInternal extends HttpClientAgent, MetricsProvider, Cl
*/
VertxInternal vertx();

Function<HttpClientResponse, Future<RequestOptions>> redirectHandler();

HttpClientOptions options();

NetClientInternal netClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public NetSocketImpl(ContextInternal context,
this.metrics = metrics;
this.messageHandler = new DataMessageHandler();
this.negotiatedApplicationLayerProtocol = negotiatedApplicationLayerProtocol;
this.pending = new InboundMessageQueue<>(context.nettyEventLoop(), context) {
this.pending = new InboundMessageQueue<>(context.eventLoop(), context.executor()) {
@Override
protected void handleResume() {
NetSocketImpl.this.doResume();
Expand Down
Loading

0 comments on commit 6947a7c

Please sign in to comment.