Skip to content

Commit

Permalink
Merge pull request #56 from agorapulse/feature/virtual-threads-support
Browse files Browse the repository at this point in the history
virtual threads support for implicitly created executors
  • Loading branch information
musketyr authored Dec 11, 2024
2 parents bcf62a3 + ab07234 commit ac85c2a
Show file tree
Hide file tree
Showing 27 changed files with 140 additions and 20 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ jobs:
check:
name: Check
runs-on: ubuntu-latest
strategy:
matrix:
java: [ 17, 21 ]
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v3
with:
distribution: corretto
java-version: 17
java-version: ${{ matrix.java }}
- uses: gradle/gradle-command-action@v2
with:
arguments: check
arguments: check -PjavaVersion=${{ matrix.java }}
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ gradleProjects {
dirs(['libs']) { Project subproject ->
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
languageVersion = JavaLanguageVersion.of(javaVersion)
}
}

Expand Down
10 changes: 8 additions & 2 deletions docs/guide/src/docs/asciidoc/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,16 @@ You can override the default scheduler (`TaskExecutors.SCHEDULED`) by setting th
[source,yaml]
.Setting the Default Scheduler
----
include::{root-dir}/libs/micronaut-worker/src/test/resources/application-virtual.yml[]
include::{root-dir}/libs/micronaut-worker/src/test/resources/application-scheduler.yml[]
----

TIP: You can let your jobs executed using virtual threads by using `virtual` executor.
You can use virtual thread factory with all the implicit executor services by setting the `worker.virtual-threads-compatible` property to `true`. Implicit executor services are created for every job that does not customize its own `scheudler` configuration property.

[source,yaml]
.Enabling Virtual Threads for Implicit Executors
----
include::{root-dir}/libs/micronaut-worker/src/test/resources/application-virtual-threads.yml[]
----

=== Job Configuration

Expand Down
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ gruVersion=2.0.5
lettuceVersion=5.2.1.RELEASE
byteBuddyVersion=1.10.22
testRetryPluginVersion=1.5.0

javaVersion = 17
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"producer": {
"queueName": "Sample",
"queueType": null
}
},
"virtualThreadsCompatible": false
},
"status": {
"executionCount": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class LongRunningJob {
}

@Concurrency(2)
@Job(initialDelay = JOBS_INITIAL_DELAY)
@Job(initialDelay = JOBS_INITIAL_DELAY, virtualThreadCompatible = true)
@Consumes(value = CONCURRENT_CONSUMER_QUEUE_NAME, maxMessages = 3)
void executeConcurrentConsumer(String message) {
if (FAILING_MESSAGE == message) {
Expand All @@ -112,7 +112,7 @@ class LongRunningJob {
}

@Fork(2)
@Job(initialDelay = JOBS_INITIAL_DELAY)
@Job(initialDelay = JOBS_INITIAL_DELAY, virtualThreadCompatible = true)
@Consumes(value = FORKED_CONSUMER_QUEUE_NAME, maxMessages = 4)
void executeForkConsumer(String message) {
if (FAILING_MESSAGE == message) {
Expand All @@ -122,7 +122,7 @@ class LongRunningJob {
consumedForkMessages.add(message)
}

@Job(initialDelay = JOBS_INITIAL_DELAY)
@Job(initialDelay = JOBS_INITIAL_DELAY, virtualThreadCompatible = true)
@Consumes(value = REGULAR_CONSUMER_QUEUE_NAME, maxMessages = 3)
void executeRegularConsumer(String message) {
if (FAILING_MESSAGE == message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ interface ConsumerQueueConfiguration extends QueueConfiguration {
@Positive
int getFork();

boolean isVirtualThreadsCompatible();

/**
* @param overrides the configuration which non-default values will override the values in this configuration
* @return self with the values overridden from the other configuration object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public interface WorkerConfiguration {

String DEFAULT_SCHEDULER = TaskExecutors.SCHEDULED;
boolean DEFAULT_VIRTUAL_THREAD_COMPATIBLE = false;

WorkerConfiguration ENABLED = new WorkerConfiguration() {
@Override
Expand All @@ -39,6 +40,11 @@ public String getScheduler() {
return DEFAULT_SCHEDULER;
}

@Override
public boolean isVirtualThreadsCompatible() {
return DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
}

};

boolean isEnabled();
Expand All @@ -50,4 +56,6 @@ public String getScheduler() {

String getScheduler();

boolean isVirtualThreadsCompatible();

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package com.agorapulse.worker.annotation;

import com.agorapulse.worker.WorkerConfiguration;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand All @@ -35,4 +37,9 @@
* @return the number of jobs running in parallel on a one server
*/
int value();

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,9 @@
*/
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public String toString() {
int concurrency;
boolean leaderOnly;
boolean followerOnly;
boolean virtualThreadsCompatible;

@Nullable private String cron;
@Nullable private Duration fixedDelay;
Expand Down Expand Up @@ -273,10 +274,21 @@ public int getFork() {
return fork;
}

@Override
public void setFork(int fork) {
this.fork = fork;
}

@Override
public boolean isVirtualThreadsCompatible() {
return virtualThreadsCompatible;
}

@Override
public void setVirtualThreadsCompatible(boolean virtualThreadCompatible) {
this.virtualThreadsCompatible = virtualThreadCompatible;
}

@Override
public DefaultConsumerQueueConfiguration getConsumer() {
return consumer;
Expand Down Expand Up @@ -315,6 +327,10 @@ public JobConfiguration mergeWith(JobConfiguration overrides) {
this.leaderOnly = false;
}

if (overrides.isVirtualThreadsCompatible() != WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE) {
this.virtualThreadsCompatible = overrides.isVirtualThreadsCompatible();
}

if (StringUtils.isNotEmpty(overrides.getCron())) {
this.cron = overrides.getCron();
this.initialDelay = null;
Expand Down Expand Up @@ -355,7 +371,7 @@ public JobConfiguration mergeWith(JobConfiguration overrides) {

@Override
public String toString() {
return "DefaultJobConfiguration{name='%s', enabled=%s, concurrency=%d, leaderOnly=%s, followerOnly=%s, cron='%s', fixedDelay=%s, initialDelay=%s, fixedRate=%s, scheduler='%s', fork=%d, consumer=%s, producer=%s}"
.formatted(name, enabled, concurrency, leaderOnly, followerOnly, cron, fixedDelay, initialDelay, fixedRate, scheduler, fork, consumer, producer);
return "DefaultJobConfiguration{name='%s', enabled=%s, concurrency=%d, leaderOnly=%s, followerOnly=%s, cron=%s, fixedDelay=%s, initialDelay=%s, fixedRate=%s, scheduler='%s', fork=%d, consumer=%s, producer=%s, virtualThreadsCompatible=%s}"
.formatted(name, enabled, concurrency, leaderOnly, followerOnly, cron, fixedDelay, initialDelay, fixedRate, scheduler, fork, consumer, producer, virtualThreadsCompatible);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class DefaultWorkerConfiguration implements WorkerConfiguration {

private boolean enabled;
private boolean virtualThreadsCompatible = WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
private String queueType;
private String scheduler = WorkerConfiguration.DEFAULT_SCHEDULER;

Expand Down Expand Up @@ -62,4 +63,13 @@ public void setScheduler(String scheduler) {
this.scheduler = scheduler;
}

@Override
public boolean isVirtualThreadsCompatible() {
return virtualThreadsCompatible;
}

public void setVirtualThreadsCompatible(boolean virtualThreadsCompatible) {
this.virtualThreadsCompatible = virtualThreadsCompatible;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ interface MutableConsumerQueueConfiguration extends MutableQueueConfiguration, J

void setLeaderOnly(boolean leaderOnly);

void setVirtualThreadsCompatible(boolean leaderOnly);

void setFollowerOnly(boolean followerOnly);

void setCron(@Nullable String cron);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,10 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,11 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.scheduling.LoomSupport;
import io.micronaut.scheduling.ScheduledExecutorTaskScheduler;
import io.micronaut.scheduling.TaskScheduler;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void close() {

@Override
public ExecutorService getExecutorService(Job job) {
return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork());
return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork(), job.getConfiguration().isVirtualThreadsCompatible());
}

@Override
Expand All @@ -72,25 +73,30 @@ public TaskScheduler getTaskScheduler(Job job) {
}

return optionalTaskScheduler.orElseGet(() -> {
ExecutorService executor = getExecutor(schedulerName, configuration.getFork());
ExecutorService executor = getExecutor(schedulerName, configuration.getFork(), configuration.isVirtualThreadsCompatible());
ScheduledExecutorTaskScheduler scheduler = new ScheduledExecutorTaskScheduler(executor);
beanContext.registerSingleton(TaskScheduler.class, scheduler, Qualifiers.byName(schedulerName));
return scheduler;
});
}

private ExecutorService getExecutor(String schedulerName, int fork) {
private ExecutorService getExecutor(String schedulerName, int fork, boolean virtualThreadsCompatible) {
if (createdExecutors.containsKey(schedulerName)) {
return createdExecutors.get(schedulerName);
}

Qualifier<ExecutorService> byName = Qualifiers.byName(schedulerName);

boolean useVirtualThreads = LoomSupport.isSupported() && virtualThreadsCompatible;

return beanContext
.findBean(ExecutorService.class, byName)
.filter(ScheduledExecutorService.class::isInstance)
.orElseGet(() -> {
ExecutorService service = Executors.newScheduledThreadPool(fork, new NamedThreadFactory(schedulerName));
ExecutorService service = Executors.newScheduledThreadPool(
useVirtualThreads ? 0 : fork,
useVirtualThreads ? LoomSupport.newVirtualThreadFactory(schedulerName) : new NamedThreadFactory(schedulerName)
);

createdExecutors.put(schedulerName, service);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class MethodJobProcessor implements ExecutableMethodProcessor<Job> {
private static final String MEMBER_MAX_MESSAGES = "maxMessages";
private static final String MEMBER_WAITING_TIME = "waitingTime";
private static final String MEMBER_SCHEDULER = "scheduler";
private static final String MEMBER_VIRTUAL_THREADS_COMPATIBLE = "scheduler";
private static final String MEMBER_TYPE = "type";
private static final String MEMBER_VALUE = "value";
private static final String MEMBER_NAME = "name";
Expand Down Expand Up @@ -118,6 +119,16 @@ public class MethodJobProcessor implements ExecutableMethodProcessor<Job> {
QueueConsumer.class.getName(), MEMBER_SCHEDULER,
QueueProducer.class.getName(), MEMBER_SCHEDULER
);
private static final Map<String, String> ANNOTATION_TO_VIRTUAL_THREADS_COMPATIBLE_MAP = Map.of(
Job.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
Cron.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
FixedRate.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
InitialDelay.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
FixedDelay.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
QueueConsumer.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
QueueProducer.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
Fork.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE
);

private static final Map<String, String> ANNOTATION_TO_FORK_MAP = Map.of(
Fork.class.getName(), MEMBER_VALUE,
Expand Down Expand Up @@ -261,6 +272,7 @@ private JobConfiguration getJobConfiguration(BeanDefinition<?> beanDefinition, E
getFirstAnnotationValue(ANNOTATION_TO_FIXED_RATE_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(fixedRate -> configuration.setFixedRate(convertDuration(jobName, fixedRate, "fixed rate")));
getFirstAnnotationValue(ANNOTATION_TO_INITIAL_DELAY_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(initialDelay -> configuration.setInitialDelay(convertDuration(jobName, initialDelay, "initial delay")));
getFirstAnnotationValue(ANNOTATION_TO_SCHEDULER_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(configuration::setScheduler);
getFirstAnnotationValue(ANNOTATION_TO_VIRTUAL_THREADS_COMPATIBLE_MAP, method::booleanValue, Boolean::booleanValue).ifPresent(configuration::setVirtualThreadsCompatible);

boolean consumer = method.getArguments().length == 1;
boolean producer = !method.getReturnType().getType().equals(void.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
worker:
scheduler: io
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
worker:
virtual-threads-compatible: true

This file was deleted.

Loading

0 comments on commit ac85c2a

Please sign in to comment.