Skip to content

Commit

Permalink
use dedicated thread pools for jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Dec 10, 2024
1 parent 423dd33 commit 66522e6
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 70 deletions.
2 changes: 2 additions & 0 deletions docs/guide/src/docs/asciidoc/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ worker:
<4> The scheduler definition using the name of the executor declared above
====

TIP: Each job can define it's owns scheduler name using `scheduler` annotation or configuration value but to prevent multiple jobs blocking each others execution, it's recommended to keep the default value. Keeping the default value will create separate executor for each job.

WARNING: For the consumer jobs, the messages are consumed synchronously so if you want to benefit from `@Fork` execution then keep the number of `maxMessages` to the default value `1`.

== Distributed Jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,21 @@ worker:
jobs:
long-running-job-execute-producer:
enabled: true
scheduler: long-running-job-execute-producer
long-running-job-execute-on-leader:
enabled: true
scheduler: long-running-job-execute-on-leader
long-running-job-execute-on-follower:
enabled: true
scheduler: long-running-job-execute-on-follower
long-running-job-execute-consecutive:
enabled: true
scheduler: long-running-job-execute-consecutive
long-running-job-execute-unlimited:
enabled: true
scheduler: long-running-job-execute-unlimited
long-running-job-execute-concurrent:
enabled: true
scheduler: long-running-job-execute-concurrent
long-running-job-execute-concurrent-consumer:
enabled: true
scheduler: long-running-job-execute-concurrent-consumer
long-running-job-execute-fork-consumer:
enabled: true
scheduler: long-running-job-execute-fork-consumer
long-running-job-execute-regular-consumer:
enabled: true
scheduler: long-running-job-execute-regular-consumer
long-running-job-execute-fork:
enabled: true
scheduler: long-running-job-execute-fork

micronaut:
executors:
long-running-job-execute-producer:
number-of-threads: 1
long-running-job-execute-on-leader:
number-of-threads: 1
long-running-job-execute-on-follower:
number-of-threads: 1
long-running-job-execute-consecutive:
number-of-threads: 1
long-running-job-execute-unlimited:
number-of-threads: 1
long-running-job-execute-concurrent:
number-of-threads: 1
long-running-job-execute-concurrent-consumer:
number-of-threads: 1
long-running-job-execute-fork-consumer:
number-of-threads: 2
long-running-job-execute-regular-consumer:
number-of-threads: 1
long-running-job-execute-fork:
number-of-threads: 2
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
String name() default "";

/**
* The name of the task executor to use to execute the job. If default value is usd then new scheduled executor
* is created for each job with the number of threads equal to the fork value.
*
* @return The name of a {@link jakarta.inject.Named} bean that is a
* {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
String name() default "";

/**
* The name of the task executor to use to execute the job. If default value is usd then new scheduled executor
* is created for each job with the number of threads equal to the fork value.
*
* @return The name of a {@link jakarta.inject.Named} bean that is a
* {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
String name() default "";

/**
* The name of the task executor to use to execute the job. If default value is usd then new scheduled executor
* is created for each job with the number of threads equal to the fork value.
*
* @return The name of a {@link jakarta.inject.Named} bean that is a
* {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
String name() default "";

/**
* The name of the task executor to use to execute the job. If default value is usd then new scheduled executor
* is created for each job with the number of threads equal to the fork value.
*
* @return The name of a {@link jakarta.inject.Named} bean that is a
* {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
String fixedRate() default "";

/**
* The name of the task executor to use to execute the job. If default value is usd then new scheduled executor
* is created for each job with the number of threads equal to the fork value.
*
* @return The name of a {@link jakarta.inject.Named} bean that is a
* {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@
String type() default "";

/**
* The time to wait for the next message to be available and also the time to wait for the next run.
* @return the maximum waiting time as duration string
*/
@AliasFor(annotation = Consumes.class, member = "value")
@AliasFor(annotation = FixedRate.class, member = "value")
String waitingTime() default "";

/**
* The number of messages to consume and also the number of threads to use to consume the messages.
* @return the maximum of messages consumed in a single run, defaults to {@link JobConfiguration.ConsumerQueueConfiguration#DEFAULT_MAX_MESSAGES}
*/
@AliasFor(annotation = Fork.class, member = "value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.agorapulse.worker.annotation.Job;
import com.agorapulse.worker.annotation.Produces;
import io.micronaut.context.annotation.AliasFor;
import jakarta.inject.Named;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
Expand Down Expand Up @@ -94,7 +93,10 @@
String fixedRate() default "";

/**
* @return The name of a {@link Named} bean that is a
* The name of the task executor to use to execute the job. If default value is usd then new scheduled executor
* is created for each job with the number of threads equal to the fork value.
*
* @return The name of a {@link jakarta.inject.Named} bean that is a
* {@link java.util.concurrent.ScheduledExecutorService} to use to schedule the task
*/
@AliasFor(annotation = Job.class, member = "scheduler")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.executor;

import com.agorapulse.worker.Job;
import com.agorapulse.worker.JobConfiguration;
import io.micronaut.context.BeanContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.scheduling.ScheduledExecutorTaskScheduler;
import io.micronaut.scheduling.TaskScheduler;
import jakarta.inject.Singleton;

import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@Singleton
public class DefaultExecutorServiceProvider implements ExecutorServiceProvider, Closeable {

private final List<ExecutorService> createdExecutors = new CopyOnWriteArrayList<>();

private final BeanContext beanContext;

public DefaultExecutorServiceProvider(BeanContext beanContext) {
this.beanContext = beanContext;
}

@Override
public void close() {
for (ExecutorService executor : createdExecutors) {
executor.shutdown();
}
}

@Override
public ExecutorService getExecutorService(Job job) {
return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork());
}

@Override
public TaskScheduler getTaskScheduler(Job job) {
JobConfiguration configuration = job.getConfiguration();

String schedulerName = ExecutorServiceProvider.getSchedulerName(configuration);

Optional<TaskScheduler> optionalTaskScheduler = beanContext.findBean(TaskScheduler.class, Qualifiers.byName(schedulerName));

if (optionalTaskScheduler.isEmpty()) {
optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(schedulerName))
.filter(ScheduledExecutorService.class::isInstance)
.map(ScheduledExecutorTaskScheduler::new);
}

return optionalTaskScheduler.orElseGet(() -> {
ExecutorService executor = getExecutor(schedulerName, configuration.getFork());
ScheduledExecutorTaskScheduler scheduler = new ScheduledExecutorTaskScheduler(executor);
beanContext.registerSingleton(TaskScheduler.class, scheduler, Qualifiers.byName(schedulerName));
return scheduler;
});
}

private ExecutorService getExecutor(String schedulerName, int fork) {
return beanContext
.findBean(ExecutorService.class, Qualifiers.byName(schedulerName))
.orElseGet(() -> {
ExecutorService service = Executors.newScheduledThreadPool(fork, new NamedThreadFactory(schedulerName));

createdExecutors.add(service);

beanContext.registerSingleton(ExecutorService.class, service, Qualifiers.byName(schedulerName));

return service;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.executor;

import com.agorapulse.worker.Job;
import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.WorkerConfiguration;
import io.micronaut.scheduling.TaskScheduler;

import java.util.concurrent.ExecutorService;

public interface ExecutorServiceProvider {

static String getSchedulerName(JobConfiguration configuration) {
return WorkerConfiguration.DEFAULT_SCHEDULER.equals(configuration.getScheduler()) ? configuration.getName() : configuration.getScheduler();
}

TaskScheduler getTaskScheduler(Job job);
ExecutorService getExecutorService(Job job);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.executor;

import io.micronaut.core.util.ArgumentUtils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Creates new named threads.
* <p>
*
* Copied from Micronaut codebase because of package-private visibility.
*/
class NamedThreadFactory implements ThreadFactory {

private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

/**
* The constructor.
*
* @param name new thread's prefix
*/
NamedThreadFactory(String name) {
ArgumentUtils.check("name", name).notNull();
group = Thread.currentThread().getThreadGroup();
namePrefix = name + "-thread-";
}

/**
* Constructs a new {@code Thread}.
*
* @param runnable The Runnable
* @return new thread
*/
@Override
public Thread newThread(Runnable runnable) {
Thread newThread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
if (newThread.isDaemon()) {
newThread.setDaemon(false);
}
if (newThread.getPriority() != Thread.NORM_PRIORITY) {
newThread.setPriority(Thread.NORM_PRIORITY);
}
return newThread;
}
}
Loading

0 comments on commit 66522e6

Please sign in to comment.