From 73c9436cc18ea67a4b96fc56dea1732b2d567166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Wed, 3 Jan 2024 15:34:39 +0200 Subject: [PATCH] topping a scheduled task does not stop other tasks with the same execution time. --- .github/workflows/build.yaml | 7 +- CHANGELOG.md | 12 ++ build.libraries.gradle | 4 +- gradle.properties | 2 +- tw-base-utils/build.gradle | 1 + .../DefaultExecutorServicesProvider.java | 2 +- .../SimpleScheduledTaskExecutor.java | 117 ++++++++++-------- .../SimpleScheduledTaskExecutorTest.java | 48 ++++++- 8 files changed, 130 insertions(+), 63 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index c6f97ca..6a4f8f3 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -20,10 +20,9 @@ jobs: max-parallel: 100 matrix: spring_boot_version: - - 3.1.2 - - 3.0.7 - - 2.7.13 - - 2.6.15 + - 3.1.6 + - 3.0.13 + - 2.7.18 env: SPRING_BOOT_VERSION: ${{ matrix.spring_boot_version }} steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 10249f8..8020a9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.12.2] - 2024-01-05 + +### Fixed + +* Stopping a scheduled task does not stop other tasks with the same execution time. + +## [1.12.1] - 2023-10-29 + +### Added + +* UuidUtils.add method. + ## [1.12.1] - 2023-10-29 ### Added diff --git a/build.libraries.gradle b/build.libraries.gradle index f6c7f08..81c0a2d 100644 --- a/build.libraries.gradle +++ b/build.libraries.gradle @@ -1,9 +1,10 @@ ext { - springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "2.6.15"}" + springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "2.7.18"}" libraries = [ // version defined awaitility : "org.awaitility:awaitility:4.2.0", + guava : 'com.google.guava:guava:33.0.0-jre', jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2', javaxValidationApi : "javax.validation:validation-api:2.0.1.Final", spockCore : "org.spockframework:spock-core:2.3-groovy-4.0", @@ -11,7 +12,6 @@ ext { // versions managed by spring-boot-dependencies platform commonsLang3 : "org.apache.commons:commons-lang3", - guava : 'com.google.guava:guava:31.1-jre', junitJupiter : "org.junit.jupiter:junit-jupiter", logbackClassic : "ch.qos.logback:logback-classic", lombok : "org.projectlombok:lombok", diff --git a/gradle.properties b/gradle.properties index 12624ca..138ec4f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=1.12.1 \ No newline at end of file +version=1.12.2 \ No newline at end of file diff --git a/tw-base-utils/build.gradle b/tw-base-utils/build.gradle index ef06b4a..08a0685 100644 --- a/tw-base-utils/build.gradle +++ b/tw-base-utils/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation libraries.jacksonDatabind implementation libraries.jacksonJsr310 implementation libraries.jacksonJdk8 + implementation libraries.guava testImplementation libraries.awaitility testImplementation libraries.guava diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/DefaultExecutorServicesProvider.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/DefaultExecutorServicesProvider.java index 3bc4d65..4731d80 100644 --- a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/DefaultExecutorServicesProvider.java +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/DefaultExecutorServicesProvider.java @@ -22,7 +22,7 @@ public void afterPropertiesSet() { try { if (!initialized) { executorService = Executors.newCachedThreadPool(new CountingThreadFactory("tw-base")); - scheduledTaskExecutor = new SimpleScheduledTaskExecutor(null, executorService); + scheduledTaskExecutor = new SimpleScheduledTaskExecutor("gste", executorService); scheduledTaskExecutor.start(); initialized = true; } diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutor.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutor.java index cb03348..15d7a48 100644 --- a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutor.java +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutor.java @@ -1,8 +1,10 @@ package com.transferwise.common.baseutils.concurrency; +import com.google.common.util.concurrent.RateLimiter; import com.transferwise.common.baseutils.ExceptionUtils; import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; @@ -16,14 +18,15 @@ @Slf4j public class SimpleScheduledTaskExecutor implements ScheduledTaskExecutor { - private ExecutorService executorService; - private DelayQueue taskQueue; + private final ExecutorService executorService; + private final DelayQueue taskQueue; @SuppressWarnings("checkstyle:MagicNumber") private Duration tick = Duration.ofMillis(50); private Clock clock; - private Lock stateLock; - private Condition stateCondition; - private AtomicInteger workingTasksCount; + private final Lock stateLock; + private final Condition stateCondition; + private final AtomicInteger workingTasksCount; + private final RateLimiter nextTaskLoggingRateLimiter = RateLimiter.create(1); private volatile boolean started; private volatile boolean stopRequested; @@ -77,6 +80,14 @@ public void start() { executorService.submit(() -> { while (!stopRequested) { ScheduledTask scheduledTask = ExceptionUtils.doUnchecked(() -> taskQueue.poll(tick.toMillis(), TimeUnit.MILLISECONDS)); + if (log.isDebugEnabled() && scheduledTask == null) { + if (nextTaskLoggingRateLimiter.tryAcquire()) { + var nextScheduledTask = taskQueue.peek(); + if (nextScheduledTask != null) { + log.debug("Next scheduled task is executed at '{}'.", Instant.ofEpochMilli(nextScheduledTask.nextExecutionTime)); + } + } + } if (scheduledTask != null && !stopRequested) { executorService.submit(scheduledTask::execute); } @@ -130,14 +141,14 @@ private long currentTimeMillis() { private static class ScheduledTask implements Delayed { - private Runnable runnable; - private TaskHandle taskHandle; - private Duration period; + private final Runnable runnable; + private final TaskHandle taskHandle; + private final Duration period; private long nextExecutionTime; - private SimpleScheduledTaskExecutor taskExecutor; + private final SimpleScheduledTaskExecutor taskExecutor; - private Lock stateLock; - private Condition stateCondition; + private final Lock stateLock; + private final Condition stateCondition; private volatile boolean stopRequested; private volatile boolean working; @@ -148,45 +159,7 @@ private ScheduledTask(SimpleScheduledTaskExecutor taskExecutor, Runnable task, D this.stateLock = new ReentrantLock(); this.stateCondition = stateLock.newCondition(); this.taskExecutor = taskExecutor; - - createTaskHandler(); - } - - private void createTaskHandler() { - this.taskHandle = new TaskHandle() { - @Override - public void stop() { - LockUtils.withLock(stateLock, () -> { - stopRequested = true; - taskExecutor.taskQueue.remove(ScheduledTask.this); - stateCondition.signalAll(); - }); - } - - @Override - public boolean hasStopped() { - return LockUtils.withLock(stateLock, () -> stopRequested() && !working); - } - - @Override - public boolean waitUntilStopped(Duration waitTime) { - long start = taskExecutor.currentTimeMillis(); - while (taskExecutor.currentTimeMillis() >= start + waitTime.toMillis()) { - if (hasStopped()) { - return true; - } - LockUtils.withLock(stateLock, () -> ExceptionUtils.doUnchecked(() -> { - boolean ignored = stateCondition.await(start - taskExecutor.currentTimeMillis() + waitTime.toMillis(), TimeUnit.MILLISECONDS); - })); - } - return hasStopped(); - } - - @Override - public boolean isWorking() { - return working; - } - }; + this.taskHandle = new DefaultTaskHandle(); } private void execute() { @@ -236,10 +209,7 @@ public int compareTo(Delayed o) { @Override public boolean equals(Object o) { - if (o instanceof Delayed) { - return compareTo((Delayed) o) == 0; - } - return false; + return this == o; } @Override @@ -250,5 +220,44 @@ public int hashCode() { protected boolean stopRequested() { return stopRequested || taskExecutor.stopRequested; } + + class DefaultTaskHandle implements TaskHandle { + + @Override + public void stop() { + LockUtils.withLock(stateLock, () -> { + stopRequested = true; + // This is O(n) + // TODO: Think if we should just leave the things into the queue. + taskExecutor.taskQueue.remove(ScheduledTask.this); + stateCondition.signalAll(); + }); + } + + @Override + public boolean hasStopped() { + return LockUtils.withLock(stateLock, () -> stopRequested() && !working); + } + + @Override + public boolean waitUntilStopped(Duration waitTime) { + long start = taskExecutor.currentTimeMillis(); + while (taskExecutor.currentTimeMillis() >= start + waitTime.toMillis()) { + if (hasStopped()) { + return true; + } + LockUtils.withLock(stateLock, () -> ExceptionUtils.doUnchecked(() -> { + boolean ignored = stateCondition.await(start - taskExecutor.currentTimeMillis() + waitTime.toMillis(), TimeUnit.MILLISECONDS); + })); + } + return hasStopped(); + } + + @Override + public boolean isWorking() { + return working; + } + + } } } diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java index 8df418a..9239216 100644 --- a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java @@ -3,19 +3,23 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.transferwise.common.baseutils.BaseTest; import com.transferwise.common.baseutils.clock.TestClock; +import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Disabled; @@ -211,4 +215,46 @@ public void testIfSchedulingOnceWorks() { scheduledTaskExecutor.stop(); } + + /* + Covers a bug, where ScheduledTask's equals method was considering tasks with the same execution time as equals. + + Stopping one task, stopped all other tasks with the same execution time as well. + */ + @Test + public void testIfStoppingTasksWorksCorrectly() { + var testClock = new TestClock(); + Map results = new ConcurrentHashMap<>(); + + var executorService = Executors.newCachedThreadPool(); + var scheduledTaskExecutor = new SimpleScheduledTaskExecutor("test", executorService).setTick(Duration.ofMillis(5)) + .setClock(testClock); + scheduledTaskExecutor.start(); + + List taskHandleList = new ArrayList<>(); + var finishedCount = new AtomicInteger(); + for (int i = 0; i < 100; i++) { + long finalI = i; + taskHandleList.add(scheduledTaskExecutor.scheduleOnce( + () -> { + results.put(finalI, Boolean.TRUE); + finishedCount.incrementAndGet(); + }, Duration.ofSeconds(1)) + ); + } + + taskHandleList.get(50).stop(); + testClock.tick(Duration.ofMillis(1001)); + + await().until(() -> finishedCount.get() == 99); + + assertNull(results.get(50L)); + for (long i = 0; i < 100; i++) { + if (i != 50) { + assertNotNull(results.get(i)); + } + } + + scheduledTaskExecutor.stop(); + } }