Skip to content

Commit

Permalink
Improve the event executor SPI so that the provider does not need to …
Browse files Browse the repository at this point in the history
…implement inThread since it can be achieved internally by Vert.x
  • Loading branch information
vietj committed Aug 19, 2024
1 parent 57f0cf4 commit 597274f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 45 deletions.
59 changes: 44 additions & 15 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,25 +491,54 @@ public ContextInternal getOrCreateContext() {
Thread thread = Thread.currentThread();
ContextInternal ctx = getContext(thread);
if (ctx == null) {
if (thread instanceof VertxThread && ((VertxThread) thread).owner == this) {
if (((VertxThread)thread).isWorker()) {
return createWorkerContext(eventLoopGroup.next(), workerPool, null);
} else {
io.netty.util.concurrent.EventExecutor eventLoop = ThreadExecutorMap.currentExecutor();
return createEventLoopContext((EventLoop) eventLoop, workerPool, null);
}
return createContext(thread);
}
return ctx;
}

private ContextInternal createContext(Thread thread) {
if (thread instanceof VertxThread && ((VertxThread) thread).owner == this) {
if (((VertxThread)thread).isWorker()) {
return createWorkerContext(eventLoopGroup.next(), workerPool, null);
} else {
EventLoop eventLoop = stickyEventLoop();
EventExecutor eventExecutor;
if (eventExecutorProvider != null && (eventExecutor = eventExecutorProvider.eventExecutorFor(Thread.currentThread())) != null) {
ctx = new ContextImpl(this, createContextLocals(), eventLoop, ThreadingModel.OTHER, eventExecutor, workerPool, new TaskQueue(), null, closeFuture, Thread.currentThread().getContextClassLoader());
} else {
ctx = createEventLoopContext(eventLoop, workerPool, Thread.currentThread().getContextClassLoader());
io.netty.util.concurrent.EventExecutor eventLoop = ThreadExecutorMap.currentExecutor();
return createEventLoopContext((EventLoop) eventLoop, workerPool, null);
}
} else {
ContextInternal ctx;
EventLoop eventLoop = stickyEventLoop();
EventExecutor eventExecutor = null;
if (eventExecutorProvider != null) {
java.util.concurrent.Executor executor = eventExecutorProvider.eventExecutorFor(thread);
if (executor != null) {
eventExecutor = new EventExecutor() {
final ThreadLocal<Boolean> inThread = new ThreadLocal<>();
@Override
public boolean inThread() {
return inThread.get() != null;
}
@Override
public void execute(Runnable command) {
executor.execute(() -> {
inThread.set(true);
try {
command.run();
} finally {
inThread.remove();
}
});
}
};
}
stickyContext.set(new WeakReference<>(ctx));
}
if (eventExecutor != null) {
ctx = new ContextImpl(this, createContextLocals(), eventLoop, ThreadingModel.OTHER, eventExecutor, workerPool, new TaskQueue(), null, closeFuture, Thread.currentThread().getContextClassLoader());
} else {
ctx = createEventLoopContext(eventLoop, workerPool, Thread.currentThread().getContextClassLoader());
}
stickyContext.set(new WeakReference<>(ctx));
return ctx;
}
return ctx;
}

private EventLoop stickyEventLoop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package io.vertx.core.spi.context.executor;

import io.vertx.codegen.annotations.Unstable;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.internal.VertxBootstrap;
import io.vertx.core.spi.VertxServiceProvider;

Expand All @@ -31,12 +30,12 @@ default void init(VertxBootstrap builder) {
}

/**
* Give vertx an event executor for the given {@code thread}.
* Provide to vertx an executor for the given {@code thread}, that will execute context tasks.
*
* @param thread the thread for which an executor is required
* @return an event executor suitable for the given thread, tasks executed on this executor will be declared as
* @return an executor suitable for the given thread, tasks executed on this executor will be declared as
* running on {@link io.vertx.core.ThreadingModel#OTHER}.
*/
EventExecutor eventExecutorFor(Thread thread);
java.util.concurrent.Executor eventExecutorFor(Thread thread);

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@
*/
package io.vertx.it.eventexecutor;

import io.vertx.core.internal.EventExecutor;
import io.vertx.core.spi.context.executor.EventExecutorProvider;

import java.util.Deque;
import java.util.LinkedList;
import java.util.NoSuchElementException;

public class CustomEventExecutorProvider implements EventExecutorProvider, EventExecutor {
public class CustomEventExecutorProvider implements EventExecutorProvider, java.util.concurrent.Executor {

private static final Deque<Runnable> tasks = new LinkedList<>();
private static volatile Thread executing;

static synchronized boolean hasNext() {
return !tasks.isEmpty();
Expand All @@ -38,24 +36,14 @@ public void run() {
throw new IllegalStateException();
}
executed = true;
executing = Thread.currentThread();
try {
task.run();
} finally {
executing = null;
}
task.run();
}
}
};
}
throw new NoSuchElementException();
}

@Override
public boolean inThread() {
return executing == Thread.currentThread();
}

@Override
public void execute(Runnable command) {
synchronized (CustomEventExecutorProvider.class) {
Expand All @@ -64,7 +52,7 @@ public void execute(Runnable command) {
}

@Override
public EventExecutor eventExecutorFor(Thread thread) {
public java.util.concurrent.Executor eventExecutorFor(Thread thread) {
if (thread instanceof CustomThread) {
return this;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,31 @@
import io.vertx.core.Context;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.internal.VertxBootstrap;
import io.vertx.test.core.AsyncTestBase;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;

public class EventExecutorProviderTest extends AsyncTestBase {

@Test
public void testExecuteTasks() {
Deque<Runnable> toRun = new ConcurrentLinkedDeque<>();
VertxBootstrap bootstrap = VertxBootstrap.create();
bootstrap.eventExecutorProvider(thread -> new EventExecutor() {
@Override
public boolean inThread() {
return thread == Thread.currentThread();
}
@Override
public void execute(Runnable command) {
toRun.add(command);
}
});
bootstrap.eventExecutorProvider(thread -> toRun::add);
bootstrap.init();
Vertx vertx = bootstrap.vertx();
Context ctx = vertx.getOrCreateContext();
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
assertEquals(ThreadingModel.OTHER, ctx.threadingModel());
assertEquals(0, toRun.size());
int[] cnt = new int[1];
ctx.runOnContext(v -> {
assertTrue(ctx.inThread());
assertSame(ctx, Vertx.currentContext());
assertSame(ctx, vertx.getOrCreateContext());
cnt[0]++;
Expand Down

0 comments on commit 597274f

Please sign in to comment.