diff --git a/docs/guide/src/docs/asciidoc/usage.adoc b/docs/guide/src/docs/asciidoc/usage.adoc index 3ddc8f26..1c706ee5 100644 --- a/docs/guide/src/docs/asciidoc/usage.adoc +++ b/docs/guide/src/docs/asciidoc/usage.adoc @@ -91,6 +91,8 @@ worker: <4> The scheduler definition using the name of the executor declared above ==== +TIP: Each job can define it's owns scheduler name using `scheduler` annotation or configuration value but to prevent multiple jobs blocking each others execution, it's recommended to keep the default value. Keeping the default value will create separate executor for each job. + WARNING: For the consumer jobs, the messages are consumed synchronously so if you want to benefit from `@Fork` execution then keep the number of `maxMessages` to the default value `1`. == Distributed Jobs diff --git a/libs/micronaut-worker-executor-redis/src/test/resources/application.yml b/libs/micronaut-worker-executor-redis/src/test/resources/application.yml index d7f5c2eb..1032490c 100644 --- a/libs/micronaut-worker-executor-redis/src/test/resources/application.yml +++ b/libs/micronaut-worker-executor-redis/src/test/resources/application.yml @@ -4,54 +4,21 @@ worker: jobs: long-running-job-execute-producer: enabled: true - scheduler: long-running-job-execute-producer long-running-job-execute-on-leader: enabled: true - scheduler: long-running-job-execute-on-leader long-running-job-execute-on-follower: enabled: true - scheduler: long-running-job-execute-on-follower long-running-job-execute-consecutive: enabled: true - scheduler: long-running-job-execute-consecutive long-running-job-execute-unlimited: enabled: true - scheduler: long-running-job-execute-unlimited long-running-job-execute-concurrent: enabled: true - scheduler: long-running-job-execute-concurrent long-running-job-execute-concurrent-consumer: enabled: true - scheduler: long-running-job-execute-concurrent-consumer long-running-job-execute-fork-consumer: enabled: true - scheduler: long-running-job-execute-fork-consumer long-running-job-execute-regular-consumer: enabled: true - scheduler: long-running-job-execute-regular-consumer long-running-job-execute-fork: enabled: true - scheduler: long-running-job-execute-fork - -micronaut: - executors: - long-running-job-execute-producer: - number-of-threads: 1 - long-running-job-execute-on-leader: - number-of-threads: 1 - long-running-job-execute-on-follower: - number-of-threads: 1 - long-running-job-execute-consecutive: - number-of-threads: 1 - long-running-job-execute-unlimited: - number-of-threads: 1 - long-running-job-execute-concurrent: - number-of-threads: 1 - long-running-job-execute-concurrent-consumer: - number-of-threads: 1 - long-running-job-execute-fork-consumer: - number-of-threads: 2 - long-running-job-execute-regular-consumer: - number-of-threads: 1 - long-running-job-execute-fork: - number-of-threads: 2 diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java index 4851ebc4..f508a2e0 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java @@ -45,6 +45,9 @@ String name() default ""; /** + * The name of the task executor to use to execute the job. If default value is usd then new scheduled executor + * is created for each job with the number of threads equal to the fork value. + * * @return The name of a {@link jakarta.inject.Named} bean that is a * {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task */ diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java index e78a497d..76cb2ef1 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java @@ -45,6 +45,9 @@ String name() default ""; /** + * The name of the task executor to use to execute the job. If default value is usd then new scheduled executor + * is created for each job with the number of threads equal to the fork value. + * * @return The name of a {@link jakarta.inject.Named} bean that is a * {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task */ diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java index 7a53810d..99f83b06 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java @@ -45,6 +45,9 @@ String name() default ""; /** + * The name of the task executor to use to execute the job. If default value is usd then new scheduled executor + * is created for each job with the number of threads equal to the fork value. + * * @return The name of a {@link jakarta.inject.Named} bean that is a * {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task */ diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java index e7f1afb4..50b46313 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java @@ -45,6 +45,9 @@ String name() default ""; /** + * The name of the task executor to use to execute the job. If default value is usd then new scheduled executor + * is created for each job with the number of threads equal to the fork value. + * * @return The name of a {@link jakarta.inject.Named} bean that is a * {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task */ diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java index a36022ae..4e2dfc5c 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java @@ -83,6 +83,9 @@ String fixedRate() default ""; /** + * The name of the task executor to use to execute the job. If default value is usd then new scheduled executor + * is created for each job with the number of threads equal to the fork value. + * * @return The name of a {@link jakarta.inject.Named} bean that is a * {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task */ diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java index 6f024702..0132e7de 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueConsumer.java @@ -54,6 +54,7 @@ String type() default ""; /** + * The time to wait for the next message to be available and also the time to wait for the next run. * @return the maximum waiting time as duration string */ @AliasFor(annotation = Consumes.class, member = "value") @@ -61,6 +62,7 @@ String waitingTime() default ""; /** + * The number of messages to consume and also the number of threads to use to consume the messages. * @return the maximum of messages consumed in a single run, defaults to {@link JobConfiguration.ConsumerQueueConfiguration#DEFAULT_MAX_MESSAGES} */ @AliasFor(annotation = Fork.class, member = "value") diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueProducer.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueProducer.java index 94788d09..b4aaa7b5 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueProducer.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/convention/QueueProducer.java @@ -21,7 +21,6 @@ import com.agorapulse.worker.annotation.Job; import com.agorapulse.worker.annotation.Produces; import io.micronaut.context.annotation.AliasFor; -import jakarta.inject.Named; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -94,7 +93,10 @@ String fixedRate() default ""; /** - * @return The name of a {@link Named} bean that is a + * The name of the task executor to use to execute the job. If default value is usd then new scheduled executor + * is created for each job with the number of threads equal to the fork value. + * + * @return The name of a {@link jakarta.inject.Named} bean that is a * {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task */ @AliasFor(annotation = Job.class, member = "scheduler") diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/DefaultExecutorServiceProvider.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/DefaultExecutorServiceProvider.java new file mode 100644 index 00000000..e6021a2a --- /dev/null +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/DefaultExecutorServiceProvider.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.executor; + +import com.agorapulse.worker.Job; +import com.agorapulse.worker.JobConfiguration; +import io.micronaut.context.BeanContext; +import io.micronaut.inject.qualifiers.Qualifiers; +import io.micronaut.scheduling.ScheduledExecutorTaskScheduler; +import io.micronaut.scheduling.TaskScheduler; +import jakarta.inject.Singleton; + +import java.io.Closeable; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@Singleton +public class DefaultExecutorServiceProvider implements ExecutorServiceProvider, Closeable { + + private final List createdExecutors = new CopyOnWriteArrayList<>(); + + private final BeanContext beanContext; + + public DefaultExecutorServiceProvider(BeanContext beanContext) { + this.beanContext = beanContext; + } + + @Override + public void close() { + for (ExecutorService executor : createdExecutors) { + executor.shutdown(); + } + } + + @Override + public ExecutorService getExecutorService(Job job) { + return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork()); + } + + @Override + public TaskScheduler getTaskScheduler(Job job) { + JobConfiguration configuration = job.getConfiguration(); + + String schedulerName = ExecutorServiceProvider.getSchedulerName(configuration); + + Optional optionalTaskScheduler = beanContext.findBean(TaskScheduler.class, Qualifiers.byName(schedulerName)); + + if (optionalTaskScheduler.isEmpty()) { + optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(schedulerName)) + .filter(ScheduledExecutorService.class::isInstance) + .map(ScheduledExecutorTaskScheduler::new); + } + + return optionalTaskScheduler.orElseGet(() -> { + ExecutorService executor = getExecutor(schedulerName, configuration.getFork()); + ScheduledExecutorTaskScheduler scheduler = new ScheduledExecutorTaskScheduler(executor); + beanContext.registerSingleton(TaskScheduler.class, scheduler, Qualifiers.byName(schedulerName)); + return scheduler; + }); + } + + private ExecutorService getExecutor(String schedulerName, int fork) { + return beanContext + .findBean(ExecutorService.class, Qualifiers.byName(schedulerName)) + .orElseGet(() -> { + ExecutorService service = Executors.newScheduledThreadPool(fork, new NamedThreadFactory(schedulerName)); + + createdExecutors.add(service); + + beanContext.registerSingleton(ExecutorService.class, service, Qualifiers.byName(schedulerName)); + + return service; + }); + } +} diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/ExecutorServiceProvider.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/ExecutorServiceProvider.java new file mode 100644 index 00000000..cd3e2441 --- /dev/null +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/ExecutorServiceProvider.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.executor; + +import com.agorapulse.worker.Job; +import com.agorapulse.worker.JobConfiguration; +import com.agorapulse.worker.WorkerConfiguration; +import io.micronaut.scheduling.TaskScheduler; + +import java.util.concurrent.ExecutorService; + +public interface ExecutorServiceProvider { + + static String getSchedulerName(JobConfiguration configuration) { + return WorkerConfiguration.DEFAULT_SCHEDULER.equals(configuration.getScheduler()) ? configuration.getName() : configuration.getScheduler(); + } + + TaskScheduler getTaskScheduler(Job job); + ExecutorService getExecutorService(Job job); + +} diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/NamedThreadFactory.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/NamedThreadFactory.java new file mode 100644 index 00000000..7419ff35 --- /dev/null +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/executor/NamedThreadFactory.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.executor; + +import io.micronaut.core.util.ArgumentUtils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Creates new named threads. + *

+ * + * Copied from Micronaut codebase because of package-private visibility. + */ +class NamedThreadFactory implements ThreadFactory { + + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + /** + * The constructor. + * + * @param name new thread's prefix + */ + NamedThreadFactory(String name) { + ArgumentUtils.check("name", name).notNull(); + group = Thread.currentThread().getThreadGroup(); + namePrefix = name + "-thread-"; + } + + /** + * Constructs a new {@code Thread}. + * + * @param runnable The Runnable + * @return new thread + */ + @Override + public Thread newThread(Runnable runnable) { + Thread newThread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); + if (newThread.isDaemon()) { + newThread.setDaemon(false); + } + if (newThread.getPriority() != Thread.NORM_PRIORITY) { + newThread.setPriority(Thread.NORM_PRIORITY); + } + return newThread; + } +} diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java index c740bedd..2ae963bf 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java @@ -19,8 +19,8 @@ import com.agorapulse.worker.Job; import com.agorapulse.worker.JobConfiguration; -import com.agorapulse.worker.JobConfigurationException; import com.agorapulse.worker.executor.DistributedJobExecutor; +import com.agorapulse.worker.executor.ExecutorServiceProvider; import com.agorapulse.worker.job.JobRunContext; import com.agorapulse.worker.queue.JobQueues; import com.agorapulse.worker.queue.QueueMessage; @@ -44,7 +44,6 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.function.Function; @Singleton @@ -56,13 +55,15 @@ public class DefaultMethodJobInvoker implements MethodJobInvoker, ApplicationEve private final Map schedulersCache = new ConcurrentHashMap<>(); private final BeanContext context; + private final ExecutorServiceProvider executorServiceProvider; private final DistributedJobExecutor distributedJobExecutor; public DefaultMethodJobInvoker( - BeanContext context, + BeanContext context, ExecutorServiceProvider executorServiceProvider, DistributedJobExecutor distributedJobExecutor ) { this.context = context; + this.executorServiceProvider = executorServiceProvider; this.distributedJobExecutor = distributedJobExecutor; } @@ -202,7 +203,7 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba } private Scheduler getScheduler(MethodJob job) { - return schedulersCache.computeIfAbsent(job.getConfiguration().getScheduler(), s -> Schedulers.fromExecutor(getExecutor(job))); + return schedulersCache.computeIfAbsent(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), s -> Schedulers.fromExecutor(executorServiceProvider.getExecutorService(job))); } private JobQueues queues(String qualifier) { @@ -213,12 +214,4 @@ private JobQueues queues(String qualifier) { .orElseGet(() -> context.getBean(JobQueues.class)); } - private ExecutorService getExecutor(Job job) { - JobConfiguration configuration = job.getConfiguration(); - - return context - .findBean(ExecutorService.class, Qualifiers.byName(configuration.getScheduler())) - .orElseThrow(() -> new JobConfigurationException(job, "No scheduler of type TaskScheduler configured for name: " + configuration.getScheduler())); - } - } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java index 8066ffb3..4c862c89 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java @@ -21,12 +21,10 @@ import com.agorapulse.worker.JobConfiguration; import com.agorapulse.worker.JobConfigurationException; import com.agorapulse.worker.JobScheduler; +import com.agorapulse.worker.executor.ExecutorServiceProvider; import com.agorapulse.worker.job.MutableCancelableJob; -import io.micronaut.context.BeanContext; import io.micronaut.context.annotation.Requires; import io.micronaut.core.util.StringUtils; -import io.micronaut.inject.qualifiers.Qualifiers; -import io.micronaut.scheduling.ScheduledExecutorTaskScheduler; import io.micronaut.scheduling.TaskScheduler; import jakarta.inject.Singleton; import org.slf4j.Logger; @@ -34,11 +32,8 @@ import java.io.Closeable; import java.time.Duration; -import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @Singleton @@ -47,16 +42,15 @@ public class DefaultJobScheduler implements JobScheduler, Closeable { private static final Logger LOG = LoggerFactory.getLogger(DefaultJobScheduler.class); - private final BeanContext beanContext; private final Queue> scheduledTasks = new ConcurrentLinkedDeque<>(); + private final ExecutorServiceProvider executorServiceProvider; /** - * @param beanContext The bean context for DI of beans annotated with {@link jakarta.inject.Inject} */ public DefaultJobScheduler( - BeanContext beanContext + ExecutorServiceProvider executorServiceProvider ) { - this.beanContext = beanContext; + this.executorServiceProvider = executorServiceProvider; } @Override @@ -71,7 +65,7 @@ public void close() { @Override public void schedule(com.agorapulse.worker.Job job) { JobConfiguration configuration = job.getConfiguration(); - TaskScheduler taskScheduler = getTaskScheduler(job); + TaskScheduler taskScheduler = executorServiceProvider.getTaskScheduler(job); ScheduledFuture scheduled = doSchedule(job, configuration, taskScheduler); @@ -129,16 +123,4 @@ private ScheduledFuture doSchedule(Job job, JobConfiguration configuration, T throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid definition: " + configuration); } - private TaskScheduler getTaskScheduler(com.agorapulse.worker.Job job) { - JobConfiguration configuration = job.getConfiguration(); - Optional optionalTaskScheduler = beanContext.findBean(TaskScheduler.class, Qualifiers.byName(configuration.getScheduler())); - - if (!optionalTaskScheduler.isPresent()) { - optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(configuration.getScheduler())) - .filter(ScheduledExecutorService.class::isInstance) - .map(ScheduledExecutorTaskScheduler::new); - } - - return optionalTaskScheduler.orElseThrow(() -> new JobConfigurationException(job, "No scheduler of type TaskScheduler configured for name: " + configuration.getScheduler())); - } }