Skip to content

Commit

Permalink
added configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Dec 11, 2024
1 parent 9c407db commit 1ea6630
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ interface ConsumerQueueConfiguration extends QueueConfiguration {
@Positive
int getFork();

boolean isVirtualThreadsCompatible();

/**
* @param overrides the configuration which non-default values will override the values in this configuration
* @return self with the values overridden from the other configuration object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public interface WorkerConfiguration {

String DEFAULT_SCHEDULER = TaskExecutors.SCHEDULED;
boolean DEFAULT_VIRTUAL_THREAD_COMPATIBLE = false;

WorkerConfiguration ENABLED = new WorkerConfiguration() {
@Override
Expand All @@ -39,6 +40,11 @@ public String getScheduler() {
return DEFAULT_SCHEDULER;
}

@Override
public boolean isVirtualThreadCompatible() {
return DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
}

};

boolean isEnabled();
Expand All @@ -50,4 +56,6 @@ public String getScheduler() {

String getScheduler();

boolean isVirtualThreadCompatible();

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "isVirtualThreadCompatible")
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "isVirtualThreadCompatible")
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "isVirtualThreadCompatible")
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,9 @@
*/
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public String toString() {
int concurrency;
boolean leaderOnly;
boolean followerOnly;
boolean virtualThreadsCompatible;

@Nullable private String cron;
@Nullable private Duration fixedDelay;
Expand Down Expand Up @@ -269,10 +270,21 @@ public int getFork() {
return fork;
}

@Override
public void setFork(int fork) {
this.fork = fork;
}

@Override
public boolean isVirtualThreadsCompatible() {
return virtualThreadsCompatible;
}

@Override
public void setVirtualThreadsCompatible(boolean virtualThreadCompatible) {
this.virtualThreadsCompatible = virtualThreadCompatible;
}

@Override
public DefaultConsumerQueueConfiguration getConsumer() {
return consumer;
Expand Down Expand Up @@ -311,6 +323,10 @@ public JobConfiguration mergeWith(JobConfiguration overrides) {
this.leaderOnly = false;
}

if (overrides.isVirtualThreadsCompatible() != WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE) {
this.virtualThreadsCompatible = overrides.isVirtualThreadsCompatible();
}

if (StringUtils.isNotEmpty(overrides.getCron())) {
this.cron = overrides.getCron();
this.initialDelay = null;
Expand Down Expand Up @@ -351,7 +367,7 @@ public JobConfiguration mergeWith(JobConfiguration overrides) {

@Override
public String toString() {
return "DefaultJobConfiguration{name='%s', enabled=%s, concurrency=%d, leaderOnly=%s, followerOnly=%s, cron='%s', fixedDelay=%s, initialDelay=%s, fixedRate=%s, scheduler='%s', fork=%d, consumer=%s, producer=%s}"
return "DefaultJobConfiguration{name='%s', enabled=%s, concurrency=%d, leaderOnly=%s, followerOnly=%s, cron=%s, fixedDelay=%s, initialDelay=%s, fixedRate=%s, scheduler='%s', fork=%d, consumer=%s, producer=%s}"
.formatted(name, enabled, concurrency, leaderOnly, followerOnly, cron, fixedDelay, initialDelay, fixedRate, scheduler, fork, consumer, producer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ interface MutableConsumerQueueConfiguration extends MutableQueueConfiguration, J

void setLeaderOnly(boolean leaderOnly);

void setVirtualThreadsCompatible(boolean leaderOnly);

void setFollowerOnly(boolean followerOnly);

void setCron(@Nullable String cron);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void close() {

@Override
public ExecutorService getExecutorService(Job job) {
return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork());
return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork(), job.getConfiguration().isVirtualThreadsCompatible());
}

@Override
Expand All @@ -73,28 +73,30 @@ public TaskScheduler getTaskScheduler(Job job) {
}

return optionalTaskScheduler.orElseGet(() -> {
ExecutorService executor = getExecutor(schedulerName, configuration.getFork());
ExecutorService executor = getExecutor(schedulerName, configuration.getFork(), configuration.isVirtualThreadsCompatible());
ScheduledExecutorTaskScheduler scheduler = new ScheduledExecutorTaskScheduler(executor);
beanContext.registerSingleton(TaskScheduler.class, scheduler, Qualifiers.byName(schedulerName));
return scheduler;
});
}

private ExecutorService getExecutor(String schedulerName, int fork) {
private ExecutorService getExecutor(String schedulerName, int fork, boolean virtualThreadsCompatible) {
if (createdExecutors.containsKey(schedulerName)) {
return createdExecutors.get(schedulerName);
}

Qualifier<ExecutorService> byName = Qualifiers.byName(schedulerName);

boolean useVirtualThreads = LoomSupport.isSupported() && virtualThreadsCompatible;

return beanContext
.findBean(ExecutorService.class, byName)
.filter(ScheduledExecutorService.class::isInstance)
.orElseGet(() -> {
// TODO: also add configuration to the job
ExecutorService service = Executors.newScheduledThreadPool(
LoomSupport.isSupported() ? 0 : fork,
LoomSupport.isSupported() ? LoomSupport.newVirtualThreadFactory(schedulerName) : new NamedThreadFactory(schedulerName)
useVirtualThreads ? 0 : fork,
useVirtualThreads ? LoomSupport.newVirtualThreadFactory(schedulerName) : new NamedThreadFactory(schedulerName)
);

createdExecutors.put(schedulerName, service);
Expand Down

0 comments on commit 1ea6630

Please sign in to comment.