Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes stopping of wrong scheduled tasks #44

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ jobs:
max-parallel: 100
matrix:
spring_boot_version:
- 3.1.2
- 3.0.7
- 2.7.13
- 2.6.15
- 3.1.6
- 3.0.13
- 2.7.18
env:
SPRING_BOOT_VERSION: ${{ matrix.spring_boot_version }}
steps:
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.12.2] - 2024-01-05

### Fixed

* Stopping a scheduled task does not stop other tasks with the same execution time.

## [1.12.1] - 2023-10-29

### Added

* UuidUtils.add method.

## [1.12.1] - 2023-10-29

### Added
Expand Down
4 changes: 2 additions & 2 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
ext {
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "2.6.15"}"
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "2.7.18"}"

libraries = [
// version defined
awaitility : "org.awaitility:awaitility:4.2.0",
guava : 'com.google.guava:guava:33.0.0-jre',
jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2',
javaxValidationApi : "javax.validation:validation-api:2.0.1.Final",
spockCore : "org.spockframework:spock-core:2.3-groovy-4.0",
springBootDependencies: "org.springframework.boot:spring-boot-dependencies:${springBootVersion}",

// versions managed by spring-boot-dependencies platform
commonsLang3 : "org.apache.commons:commons-lang3",
guava : 'com.google.guava:guava:31.1-jre',
junitJupiter : "org.junit.jupiter:junit-jupiter",
logbackClassic : "ch.qos.logback:logback-classic",
lombok : "org.projectlombok:lombok",
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.12.1
version=1.12.2
1 change: 1 addition & 0 deletions tw-base-utils/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation libraries.jacksonDatabind
implementation libraries.jacksonJsr310
implementation libraries.jacksonJdk8
implementation libraries.guava

testImplementation libraries.awaitility
testImplementation libraries.guava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void afterPropertiesSet() {
try {
if (!initialized) {
executorService = Executors.newCachedThreadPool(new CountingThreadFactory("tw-base"));
scheduledTaskExecutor = new SimpleScheduledTaskExecutor(null, executorService);
scheduledTaskExecutor = new SimpleScheduledTaskExecutor("gste", executorService);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "GSTE" mean here?

Copy link
Contributor Author

@onukristo onukristo Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global scheduled task executor.

scheduledTaskExecutor.start();
initialized = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.transferwise.common.baseutils.concurrency;

import com.google.common.util.concurrent.RateLimiter;
import com.transferwise.common.baseutils.ExceptionUtils;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
Expand All @@ -16,14 +18,15 @@
@Slf4j
public class SimpleScheduledTaskExecutor implements ScheduledTaskExecutor {

private ExecutorService executorService;
private DelayQueue<ScheduledTask> taskQueue;
private final ExecutorService executorService;
private final DelayQueue<ScheduledTask> taskQueue;
@SuppressWarnings("checkstyle:MagicNumber")
private Duration tick = Duration.ofMillis(50);
private Clock clock;
private Lock stateLock;
private Condition stateCondition;
private AtomicInteger workingTasksCount;
private final Lock stateLock;
private final Condition stateCondition;
private final AtomicInteger workingTasksCount;
private final RateLimiter nextTaskLoggingRateLimiter = RateLimiter.create(1);

private volatile boolean started;
private volatile boolean stopRequested;
Expand Down Expand Up @@ -77,6 +80,14 @@ public void start() {
executorService.submit(() -> {
while (!stopRequested) {
ScheduledTask scheduledTask = ExceptionUtils.doUnchecked(() -> taskQueue.poll(tick.toMillis(), TimeUnit.MILLISECONDS));
if (log.isDebugEnabled() && scheduledTask == null) {
if (nextTaskLoggingRateLimiter.tryAcquire()) {
var nextScheduledTask = taskQueue.peek();
if (nextScheduledTask != null) {
log.debug("Next scheduled task is executed at '{}'.", Instant.ofEpochMilli(nextScheduledTask.nextExecutionTime));
}
}
}
if (scheduledTask != null && !stopRequested) {
executorService.submit(scheduledTask::execute);
}
Expand Down Expand Up @@ -130,14 +141,14 @@ private long currentTimeMillis() {

private static class ScheduledTask implements Delayed {

private Runnable runnable;
private TaskHandle taskHandle;
private Duration period;
private final Runnable runnable;
private final TaskHandle taskHandle;
private final Duration period;
private long nextExecutionTime;
private SimpleScheduledTaskExecutor taskExecutor;
private final SimpleScheduledTaskExecutor taskExecutor;

private Lock stateLock;
private Condition stateCondition;
private final Lock stateLock;
private final Condition stateCondition;

private volatile boolean stopRequested;
private volatile boolean working;
Expand All @@ -148,45 +159,7 @@ private ScheduledTask(SimpleScheduledTaskExecutor taskExecutor, Runnable task, D
this.stateLock = new ReentrantLock();
this.stateCondition = stateLock.newCondition();
this.taskExecutor = taskExecutor;

createTaskHandler();
}

private void createTaskHandler() {
this.taskHandle = new TaskHandle() {
@Override
public void stop() {
LockUtils.withLock(stateLock, () -> {
stopRequested = true;
taskExecutor.taskQueue.remove(ScheduledTask.this);
stateCondition.signalAll();
});
}

@Override
public boolean hasStopped() {
return LockUtils.withLock(stateLock, () -> stopRequested() && !working);
}

@Override
public boolean waitUntilStopped(Duration waitTime) {
long start = taskExecutor.currentTimeMillis();
while (taskExecutor.currentTimeMillis() >= start + waitTime.toMillis()) {
if (hasStopped()) {
return true;
}
LockUtils.withLock(stateLock, () -> ExceptionUtils.doUnchecked(() -> {
boolean ignored = stateCondition.await(start - taskExecutor.currentTimeMillis() + waitTime.toMillis(), TimeUnit.MILLISECONDS);
}));
}
return hasStopped();
}

@Override
public boolean isWorking() {
return working;
}
};
this.taskHandle = new DefaultTaskHandle();
}

private void execute() {
Expand Down Expand Up @@ -236,10 +209,7 @@ public int compareTo(Delayed o) {

@Override
public boolean equals(Object o) {
if (o instanceof Delayed) {
return compareTo((Delayed) o) == 0;
}
return false;
return this == o;
}

@Override
Expand All @@ -250,5 +220,44 @@ public int hashCode() {
protected boolean stopRequested() {
return stopRequested || taskExecutor.stopRequested;
}

class DefaultTaskHandle implements TaskHandle {

@Override
public void stop() {
LockUtils.withLock(stateLock, () -> {
stopRequested = true;
// This is O(n)
// TODO: Think if we should just leave the things into the queue.
taskExecutor.taskQueue.remove(ScheduledTask.this);
stateCondition.signalAll();
});
}

@Override
public boolean hasStopped() {
return LockUtils.withLock(stateLock, () -> stopRequested() && !working);
}

@Override
public boolean waitUntilStopped(Duration waitTime) {
long start = taskExecutor.currentTimeMillis();
while (taskExecutor.currentTimeMillis() >= start + waitTime.toMillis()) {
if (hasStopped()) {
return true;
}
LockUtils.withLock(stateLock, () -> ExceptionUtils.doUnchecked(() -> {
boolean ignored = stateCondition.await(start - taskExecutor.currentTimeMillis() + waitTime.toMillis(), TimeUnit.MILLISECONDS);
}));
}
return hasStopped();
}

@Override
public boolean isWorking() {
return working;
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.BaseTest;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -211,4 +215,46 @@ public void testIfSchedulingOnceWorks() {

scheduledTaskExecutor.stop();
}

/*
Covers a bug, where ScheduledTask's equals method was considering tasks with the same execution time as equals.

Stopping one task, stopped all other tasks with the same execution time as well.
*/
@Test
public void testIfStoppingTasksWorksCorrectly() {
var testClock = new TestClock();
Map<Long, Boolean> results = new ConcurrentHashMap<>();

var executorService = Executors.newCachedThreadPool();
var scheduledTaskExecutor = new SimpleScheduledTaskExecutor("test", executorService).setTick(Duration.ofMillis(5))
.setClock(testClock);
scheduledTaskExecutor.start();

List<TaskHandle> taskHandleList = new ArrayList<>();
var finishedCount = new AtomicInteger();
for (int i = 0; i < 100; i++) {
long finalI = i;
taskHandleList.add(scheduledTaskExecutor.scheduleOnce(
() -> {
results.put(finalI, Boolean.TRUE);
finishedCount.incrementAndGet();
}, Duration.ofSeconds(1))
);
}

taskHandleList.get(50).stop();
testClock.tick(Duration.ofMillis(1001));

await().until(() -> finishedCount.get() == 99);

assertNull(results.get(50L));
for (long i = 0; i < 100; i++) {
if (i != 50) {
assertNotNull(results.get(i));
}
}

scheduledTaskExecutor.stop();
}
}
Loading