Skip to content

Commit

Permalink
Merge pull request #43 from agorapulse/fix/on-leader-only-not-working
Browse files Browse the repository at this point in the history
Improved reliability of leader selection
  • Loading branch information
musketyr authored Nov 28, 2024
2 parents ea18e80 + 205f389 commit 0bd32b1
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import com.agorapulse.worker.Job;
import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.JobManager;
import com.agorapulse.worker.event.JobExecutorEvent;
import com.agorapulse.worker.executor.DistributedJobExecutor;
import com.agorapulse.worker.executor.ExecutorId;
import com.agorapulse.worker.job.JobRunContext;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.inject.qualifiers.Qualifiers;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
Expand All @@ -36,8 +39,10 @@
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;

@Singleton
@Requires(beans = {StatefulRedisConnection.class}, property = "redis.uri")
Expand All @@ -46,6 +51,8 @@ public class RedisJobExecutor implements DistributedJobExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisJobExecutor.class);

private static final String EXECUTOR_TYPE = "redis";

private static final String LIBRARY_PREFIX = "APMW::";
private static final String PREFIX_LEADER = LIBRARY_PREFIX + "LEADER::";
private static final String PREFIX_COUNT = LIBRARY_PREFIX + "COUNT::";
Expand All @@ -72,50 +79,64 @@ public class RedisJobExecutor implements DistributedJobExecutor {
private final ExecutorId executorId;
private final BeanContext beanContext;
private final JobManager jobManager;

public RedisJobExecutor(StatefulRedisConnection<String, String> connection, ExecutorId executorId, BeanContext beanContext, JobManager jobManager) {
private final ApplicationEventPublisher<JobExecutorEvent> eventPublisher;

public RedisJobExecutor(
StatefulRedisConnection<String, String> connection,
ExecutorId executorId,
BeanContext beanContext,
JobManager jobManager,
ApplicationEventPublisher<JobExecutorEvent> eventPublisher
) {
this.connection = connection;
this.executorId = executorId;
this.beanContext = beanContext;
this.jobManager = jobManager;
this.eventPublisher = eventPublisher;
}

@Override
public <R> Publisher<R> executeOnlyOnLeader(String jobName, Callable<R> supplier) {
public <R> Publisher<R> executeOnlyOnLeader(JobRunContext context, Callable<R> supplier) {
RedisAsyncCommands<String, String> commands = connection.async();

return readMasterHostname(jobName, commands).flatMap(h -> {
return readMasterHostname(context.getStatus().getName(), commands).flatMap(h -> {
if (executorId.id().equals(h)) {
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
eventPublisher.publishEvent(JobExecutorEvent.leaderOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.EXECUTE, context.getStatus(), executorId.id()));
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(context.getStatus().getName())));
}
eventPublisher.publishEvent(JobExecutorEvent.leaderOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), executorId.id()));
return Mono.empty();
}).flux();
}

@Override
public <R> Publisher<R> executeConcurrently(String jobName, int maxConcurrency, Callable<R> supplier) {
public <R> Publisher<R> executeConcurrently(JobRunContext context, int maxConcurrency, Callable<R> supplier) {
RedisAsyncCommands<String, String> commands = connection.async();
return readAndIncreaseCurrentCount(jobName, commands, maxConcurrency <= 1 ? LOCK_TIMEOUT : COUNT_TIMEOUT)
return readAndIncreaseCurrentCount(context.getStatus().getName(), commands, maxConcurrency <= 1 ? LOCK_TIMEOUT : COUNT_TIMEOUT)
.flatMap(count -> {
if (count > maxConcurrency) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Skipping execution of the job {} as the concurrency level {} is already reached", jobName, maxConcurrency);
LOGGER.trace("Skipping execution of the job {} as the concurrency level {} is already reached", context.getStatus().getName(), maxConcurrency);
}
return decreaseCurrentExecutionCount(jobName, commands).flatMap(decreased -> Mono.empty());
eventPublisher.publishEvent(JobExecutorEvent.concurrent(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), maxConcurrency, executorId.id()));
return decreaseCurrentExecutionCount(context.getStatus().getName(), commands).flatMap(decreased -> Mono.empty());
}

return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName))).doFinally(signal -> decreaseCurrentExecutionCount(jobName, commands).subscribe());
context.onFinished(s-> decreaseCurrentExecutionCount(s.getName(), commands).subscribe());
eventPublisher.publishEvent(JobExecutorEvent.concurrent(EXECUTOR_TYPE, JobExecutorEvent.Execution.EXECUTE, context.getStatus(), maxConcurrency, executorId.id()));
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(context.getStatus().getName())));
}).flux();
}

@Override
public <R> Publisher<R> executeOnlyOnFollower(String jobName, Callable<R> supplier) {
public <R> Publisher<R> executeOnlyOnFollower(JobRunContext context, Callable<R> supplier) {
RedisAsyncCommands<String, String> commands = connection.async();
return readMasterHostname(jobName, commands).flatMap(h -> {
return readMasterHostname(context.getStatus().getName(), commands).flatMap(h -> {
if (!"".equals(h) && h.equals(executorId.id())) {
eventPublisher.publishEvent(JobExecutorEvent.followerOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), executorId.id()));
return Mono.empty();
}
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
eventPublisher.publishEvent(JobExecutorEvent.followerOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.EXECUTE, context.getStatus(), executorId.id()));
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(context.getStatus().getName())));
}).flux();
}

Expand All @@ -137,11 +158,14 @@ private static Mono<Long> decreaseCurrentExecutionCount(String jobName, RedisAsy
}

private Mono<Object> readMasterHostname(String jobName, RedisAsyncCommands<String, String> commands) {
return Mono.fromFuture(commands.eval(
LEADER_CHECK,
ScriptOutputType.VALUE,
PREFIX_LEADER + jobName, executorId.id(), String.valueOf(LEADER_INACTIVITY_TIMEOUT)
).toCompletableFuture()).defaultIfEmpty("");
int randomDelay = ThreadLocalRandom.current().nextInt(1, 500);
return Mono.delay(Duration.ofMillis(randomDelay))
.flatMap(ignored -> Mono.fromFuture(commands.eval(
LEADER_CHECK,
ScriptOutputType.VALUE,
PREFIX_LEADER + jobName, executorId.id(), String.valueOf(LEADER_INACTIVITY_TIMEOUT)
).toCompletableFuture()))
.defaultIfEmpty("");
}

private ExecutorService getExecutorService(String jobName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package com.agorapulse.worker.redis

import com.agorapulse.worker.event.JobExecutorEvent
import com.agorapulse.worker.executor.ExecutorId
import com.agorapulse.worker.tck.executor.AbstractJobExecutorSpec
import com.agorapulse.worker.tck.executor.JobExecutorEventCollector
import io.micronaut.context.ApplicationContext
import org.testcontainers.containers.GenericContainer
import org.testcontainers.spock.Testcontainers
Expand Down Expand Up @@ -56,4 +58,27 @@ class RedisJobExecutorSpec extends AbstractJobExecutorSpec {
return ctx.start()
}

@Override
@SuppressWarnings('UnnecessaryCollectCall')
protected boolean verifyExecutorEvents(
ApplicationContext first,
ApplicationContext second,
ApplicationContext third
) {
List<JobExecutorEvent> allEvents = [first, second, third].collect { it.getBean(JobExecutorEventCollector) }.collectMany { it.events }
assert allEvents.every { it.executor == 'redis' }

List<JobExecutorEvent> leaderEvents = allEvents.findAll { it.status.name == 'long-running-job-execute-on-leader' }

assert leaderEvents.count { it.execution == JobExecutorEvent.Execution.SKIP } == 2
assert leaderEvents.count { it.execution == JobExecutorEvent.Execution.EXECUTE } == 1

List<JobExecutorEvent> producerEvents = allEvents.findAll { it.status.name == 'long-running-job-execute-producer' }

assert producerEvents.count { it.execution == JobExecutorEvent.Execution.SKIP } == 2
assert producerEvents.count { it.execution == JobExecutorEvent.Execution.EXECUTE } == 1

return true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ abstract class AbstractJobExecutorSpec extends Specification {
jobs.count { it.producer.get() == 1 } == 1
}

verifyExecutorEvents(one, two, three)

cleanup:
closeQuietly one, two, three
}

protected abstract ApplicationContext buildContext()
protected abstract Class<?> getRequiredExecutorType()
protected abstract boolean verifyExecutorEvents(ApplicationContext first, ApplicationContext second, ApplicationContext third)

// some implementation may not support followers, such as the local implementation
protected int getExpectedFollowersCount() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.tck.executor;

import com.agorapulse.worker.event.JobExecutorEvent;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.runtime.context.scope.Refreshable;
import io.micronaut.runtime.context.scope.refresh.RefreshEvent;
import io.micronaut.runtime.context.scope.refresh.RefreshEventListener;
import io.micronaut.runtime.event.annotation.EventListener;
import jakarta.inject.Singleton;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

@Primary
@Singleton
@Refreshable
public class JobExecutorEventCollector implements ApplicationEventPublisher<JobExecutorEvent>, RefreshEventListener {

private final List<JobExecutorEvent> events = new ArrayList<>();

/**
* For usage from micronaut tests.
* @param event the event to be collected
*/
@EventListener
void onEvent(JobExecutorEvent event) {
events.add(event);
}

/**
* For direct usage in unit tests
* @param event The event to publish
*/
@Override
public void publishEvent(@NonNull JobExecutorEvent event) {
events.add(event);
}

@Override
public void onApplicationEvent(RefreshEvent event) {
events.clear();
}

public List<JobExecutorEvent> getEvents() {
return events;
}

@Override
public @NonNull Set<String> getObservedConfigurationPrefixes() {
return Set.of();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.event;

import com.agorapulse.worker.JobRunStatus;

public class JobExecutorEvent {

public enum Execution {
EXECUTE, SKIP
}

public enum Type {
LEADER_ONLY, FOLLOWER_ONLY, CONCURRENT
}

public static JobExecutorEvent leaderOnly(String executor, Execution outcome, JobRunStatus status, String executorId) {
return new JobExecutorEvent(executor, Type.LEADER_ONLY, outcome, status, 0, executorId);
}

public static JobExecutorEvent followerOnly(String executor, Execution outcome, JobRunStatus status, String executorId) {
return new JobExecutorEvent(executor, Type.FOLLOWER_ONLY, outcome, status, 0, executorId);
}

public static JobExecutorEvent concurrent(String executor, Execution outcome, JobRunStatus status, int concurrency, String executorId) {
return new JobExecutorEvent(executor, Type.CONCURRENT, outcome, status, concurrency, executorId);
}

private final String executor;
private final JobRunStatus status;
private final int concurrency;
private final Execution execution;
private final Type type;
private final String executorId;

public JobExecutorEvent(String executor, Type type, Execution execution, JobRunStatus status, int concurrency, String executorId) {
this.executor = executor;
this.status = status;
this.concurrency = concurrency;
this.execution = execution;
this.type = type;
this.executorId = executorId;
}

public String getExecutor() {
return executor;
}

public JobRunStatus getStatus() {
return status;
}

public int getConcurrency() {
return concurrency;
}

public Execution getExecution() {
return execution;
}

public Type getType() {
return type;
}

public String getExecutorId() {
return executorId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.agorapulse.worker.executor;

import com.agorapulse.worker.job.JobRunContext;
import io.micronaut.validation.Validated;
import org.reactivestreams.Publisher;

Expand All @@ -28,32 +29,32 @@ public interface DistributedJobExecutor {
/**
* Executes the tasks only on the leader server.
*
* @param jobName the name of the job
* @param context the execution context
* @param task the task to be performed
* @param <R> the type of the task's result
* @return publisher which calls to the original supplier or empty publisher if the task should not be executed
*/
<R> Publisher<R> executeOnlyOnLeader(String jobName, Callable<R> task);
<R> Publisher<R> executeOnlyOnLeader(JobRunContext context, Callable<R> task);

/**
* Executes the tasks only if it's not already running.
*
* @param jobName the name of the job
* @param context the execution context
* @param concurrency the maximal count of jobs running at the same time
* @param task the task to be performed
* @param <R> the type of the task's result
* @return publisher which calls the original supplier or empty publisher if the task should not be executed
*/
<R> Publisher<R> executeConcurrently(String jobName, int concurrency, Callable<R> task);
<R> Publisher<R> executeConcurrently(JobRunContext context, int concurrency, Callable<R> task);

/**
* Executes the tasks only on the follower server.
*
* @param jobName the name of the job
* @param context the execution context
* @param task the task to be performed
* @param <R> the type of the task's result
* @return publisher which calls the original supplier or empty publisher if the task should not be executed
*/
<R> Publisher<R> executeOnlyOnFollower(String jobName, Callable<R> task);
<R> Publisher<R> executeOnlyOnFollower(JobRunContext context, Callable<R> task);

}
Loading

0 comments on commit 0bd32b1

Please sign in to comment.