From f91cc5ea730c26438756aa2e1767a7290c918fe0 Mon Sep 17 00:00:00 2001 From: musketyr Date: Thu, 5 Dec 2024 10:41:11 +0100 Subject: [PATCH] Redis Queues resurrection --- .../src/docs/asciidoc/breaking-changes.adoc | 4 - docs/guide/src/docs/asciidoc/index.adoc | 3 - .../guide/src/docs/asciidoc/installation.adoc | 3 + .../micronaut-worker-queues-redis.gradle | 24 +++ .../queues/redis/ConnectionFactory.java | 49 +++++ .../queues/redis/RedisPoolConfiguration.java | 81 ++++++++ .../worker/queues/redis/RedisQueues.java | 177 ++++++++++++++++++ .../queues/redis/RedisQueuesFactory.java | 47 +++++ .../queues/redis/RedisQueuesSpec.groovy | 56 ++++++ 9 files changed, 437 insertions(+), 7 deletions(-) delete mode 100644 docs/guide/src/docs/asciidoc/breaking-changes.adoc create mode 100644 libs/micronaut-worker-queues-redis/micronaut-worker-queues-redis.gradle create mode 100644 libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/ConnectionFactory.java create mode 100644 libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisPoolConfiguration.java create mode 100644 libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java create mode 100644 libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueuesFactory.java create mode 100644 libs/micronaut-worker-queues-redis/src/test/groovy/com/agorapulse/worker/queues/redis/RedisQueuesSpec.groovy diff --git a/docs/guide/src/docs/asciidoc/breaking-changes.adoc b/docs/guide/src/docs/asciidoc/breaking-changes.adoc deleted file mode 100644 index b9671876..00000000 --- a/docs/guide/src/docs/asciidoc/breaking-changes.adoc +++ /dev/null @@ -1,4 +0,0 @@ -== 3.0.0 -* removed Redis Queue implementation due the minimal use and complexity of the implementation -* removed deprecated `JobQueue#readMessages(String, int, Duration, Argument, Consumer)` method -* reading queues returns `QueueMessage` instead of the message itself and the underlying implementation must properly call `delete` or `requeue` on the message to either remove it from the queue or return it back to the queue for processing diff --git a/docs/guide/src/docs/asciidoc/index.adoc b/docs/guide/src/docs/asciidoc/index.adoc index fc90ed1c..285f8230 100644 --- a/docs/guide/src/docs/asciidoc/index.adoc +++ b/docs/guide/src/docs/asciidoc/index.adoc @@ -17,9 +17,6 @@ include::{includedir}/introduction.adoc[] include::{includedir}/installation.adoc[] include::{includedir}/usage.adoc[] -= Breaking Changes -include::{includedir}/breaking-changes.adoc[] - = Links link:api/index.html[Javadoc, window="_blank"] diff --git a/docs/guide/src/docs/asciidoc/installation.adoc b/docs/guide/src/docs/asciidoc/installation.adoc index 59207c5c..dcd4e4cc 100644 --- a/docs/guide/src/docs/asciidoc/installation.adoc +++ b/docs/guide/src/docs/asciidoc/installation.adoc @@ -30,5 +30,8 @@ dependencies { // to enable Redis leader/follower capabilities implementation("com.agorapulse:micronaut-worker-executor-redis:{project-version}") + + // to enable Redis queues integration + implementation("com.agorapulse:micronaut-worker-queues-redis:{project-version}") } ---- diff --git a/libs/micronaut-worker-queues-redis/micronaut-worker-queues-redis.gradle b/libs/micronaut-worker-queues-redis/micronaut-worker-queues-redis.gradle new file mode 100644 index 00000000..80d9aa07 --- /dev/null +++ b/libs/micronaut-worker-queues-redis/micronaut-worker-queues-redis.gradle @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + api project(':micronaut-worker') + implementation 'io.micronaut.redis:micronaut-redis-lettuce' + implementation 'io.micronaut:micronaut-jackson-databind' + + testImplementation project(':micronaut-worker-tck') +} diff --git a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/ConnectionFactory.java b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/ConnectionFactory.java new file mode 100644 index 00000000..2369fdc3 --- /dev/null +++ b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/ConnectionFactory.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.queues.redis; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.support.AsyncObjectFactory; + +import java.util.concurrent.CompletableFuture; + +public class ConnectionFactory implements AsyncObjectFactory> { + + private final RedisClient client; + + public ConnectionFactory(RedisClient client) { + this.client = client; + } + + @Override + public CompletableFuture> create() { + return CompletableFuture.completedFuture(client.connect()); + } + + @Override + public CompletableFuture destroy(StatefulRedisConnection object) { + return object.closeAsync(); + } + + @Override + public CompletableFuture validate(StatefulRedisConnection object) { + return CompletableFuture.completedFuture(object.isOpen()); + } + +} diff --git a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisPoolConfiguration.java b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisPoolConfiguration.java new file mode 100644 index 00000000..aa2ef0ff --- /dev/null +++ b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisPoolConfiguration.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.queues.redis; + +import io.lettuce.core.support.BasePoolConfig; +import io.lettuce.core.support.BoundedPoolConfig; +import io.micronaut.context.annotation.ConfigurationProperties; + +@ConfigurationProperties("redis.pool") +public class RedisPoolConfiguration { + + private int minIdle = BoundedPoolConfig.DEFAULT_MIN_IDLE; + private int maxIdle = BoundedPoolConfig.DEFAULT_MAX_IDLE; + private int maxTotal = BoundedPoolConfig.DEFAULT_MAX_TOTAL; + private boolean testOnCreate = BasePoolConfig.DEFAULT_TEST_ON_CREATE; + private boolean testOnAcquire = BasePoolConfig.DEFAULT_TEST_ON_ACQUIRE; + private boolean testOnRelease = BasePoolConfig.DEFAULT_TEST_ON_RELEASE; + + public int getMinIdle() { + return minIdle; + } + + public void setMinIdle(int minIdle) { + this.minIdle = minIdle; + } + + public int getMaxIdle() { + return maxIdle; + } + + public void setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + } + + public int getMaxTotal() { + return maxTotal; + } + + public void setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + } + + public boolean isTestOnCreate() { + return testOnCreate; + } + + public void setTestOnCreate(boolean testOnCreate) { + this.testOnCreate = testOnCreate; + } + + public boolean isTestOnAcquire() { + return testOnAcquire; + } + + public void setTestOnAcquire(boolean testOnAcquire) { + this.testOnAcquire = testOnAcquire; + } + + public boolean isTestOnRelease() { + return testOnRelease; + } + + public void setTestOnRelease(boolean testOnRelease) { + this.testOnRelease = testOnRelease; + } +} diff --git a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java new file mode 100644 index 00000000..3cb87bb1 --- /dev/null +++ b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java @@ -0,0 +1,177 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.queues.redis; + +import com.agorapulse.worker.queue.JobQueues; +import com.agorapulse.worker.queue.QueueMessage; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.lettuce.core.RedisClient; +import io.lettuce.core.TransactionResult; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.support.BoundedAsyncPool; +import io.lettuce.core.support.BoundedPoolConfig; +import io.micronaut.core.type.Argument; +import io.micronaut.jackson.JacksonConfiguration; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +public class RedisQueues implements JobQueues { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisQueues.class); + private static final String PREFIX_DATED_QUEUE = "DATED_QUEUE::"; + + private final ObjectMapper objectMapper; + private final BoundedAsyncPool> pool; + + public RedisQueues(ObjectMapper objectMapper, RedisClient client, RedisPoolConfiguration redisPoolConfiguration) { + this.objectMapper = objectMapper; + + BoundedPoolConfig config = BoundedPoolConfig + .builder() + .minIdle(redisPoolConfiguration.getMinIdle()) + .maxIdle(redisPoolConfiguration.getMaxIdle()) + .maxTotal(redisPoolConfiguration.getMaxTotal()) + .testOnAcquire(redisPoolConfiguration.isTestOnAcquire()) + .testOnCreate(redisPoolConfiguration.isTestOnCreate()) + .testOnRelease(redisPoolConfiguration.isTestOnRelease()) + .build(); + + pool = new BoundedAsyncPool<>(new ConnectionFactory(client), config); + } + + @SuppressWarnings("unchecked") + @Override + public Publisher> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { + TransactionResult result = withTransaction(redisCommands -> { + String key = getKey(queueName); + redisCommands.zrange(key, 0, maxNumberOfMessages - 1L); + redisCommands.zremrangebyrank(key, 0, maxNumberOfMessages - 1L); + }); + + if (result == null) { + return Flux.empty(); + } + + + Object firstResponse = result.get(0); + + if (!(firstResponse instanceof List)) { + throw new IllegalStateException("There result is not a list of Strings. Got: " + firstResponse); + } + + List messages = (List) firstResponse; + + return Flux.fromIterable(messages).handle((body, sink) -> { + try { + T message = objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())); + QueueMessage queueMessage = QueueMessage.alwaysRequeue( + UUID.randomUUID().toString(), + message, + () -> {}, + () -> sendRawMessage(queueName, body) + ); + sink.next(queueMessage); + } catch (JsonProcessingException e) { + if (argument.equalsType(Argument.STRING)) { + QueueMessage queueMessage = QueueMessage.alwaysRequeue( + UUID.randomUUID().toString(), + (T) body, + () -> {}, + () -> sendRawMessage(queueName, body) + ); + sink.next(queueMessage); + } else { + sink.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e)); + } + } + }); + } + + @Override + public void sendMessage(String queueName, Object result) { + try { + String item = objectMapper.writeValueAsString(result); + sendRawMessage(queueName, item); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Cannot write " + result + " to JSON", e); + } + } + + @Override + public void sendRawMessage(String queueName, Object result) { + String item = result.toString(); + withRedis(redisCommands -> { + String key = getKey(queueName); + redisCommands.zscore(key, item).thenAccept(zscore -> { + if (zscore == null) { + long time = System.currentTimeMillis(); + redisCommands.zadd(key, time, item); + } + }); + }); + } + + private String getKey(String queueName) { + return PREFIX_DATED_QUEUE + queueName; + } + + private TransactionResult withTransaction(Consumer> action) { + try { + StatefulRedisConnection connection = pool.acquire().get(); + RedisCommands sync = connection.sync(); + try { + sync.multi(); + action.accept(sync); + return sync.exec(); + } finally { + pool.release(connection); + } + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Exception obtaining connection from the pool", e); + return null; + } + } + + private void withRedis( + Consumer> action + ) { + try { + StatefulRedisConnection connection = pool.acquire().get(); + RedisAsyncCommands sync = connection.async(); + try { + action.accept(sync); + } finally { + pool.release(connection); + } + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Exception obtaining connection from the pool", e); + } + } + +} diff --git a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueuesFactory.java b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueuesFactory.java new file mode 100644 index 00000000..16eec5f0 --- /dev/null +++ b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueuesFactory.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.queues.redis; + +import com.agorapulse.worker.queue.JobQueues; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.lettuce.core.RedisClient; +import io.micronaut.context.annotation.Bean; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; + +import jakarta.inject.Named; +import jakarta.inject.Singleton; + +@Factory +@Requires(classes = { RedisClient.class }, beans = { RedisClient.class }, property = "redis.uri") +public class RedisQueuesFactory { + + + @Bean + @Singleton + @Named("redis") + @Requires(property = "worker.queues.redis.enabled", value = "true", defaultValue = "true") + public JobQueues redisQueues( + RedisClient redisClient, + ObjectMapper mapper, + RedisPoolConfiguration redisPoolConfiguration + ) { + return new RedisQueues(mapper, redisClient, redisPoolConfiguration); + } + +} diff --git a/libs/micronaut-worker-queues-redis/src/test/groovy/com/agorapulse/worker/queues/redis/RedisQueuesSpec.groovy b/libs/micronaut-worker-queues-redis/src/test/groovy/com/agorapulse/worker/queues/redis/RedisQueuesSpec.groovy new file mode 100644 index 00000000..e4313607 --- /dev/null +++ b/libs/micronaut-worker-queues-redis/src/test/groovy/com/agorapulse/worker/queues/redis/RedisQueuesSpec.groovy @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.queues.redis + +import com.agorapulse.worker.tck.queue.AbstractQueuesSpec +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.micronaut.test.support.TestPropertyProvider +import org.testcontainers.containers.GenericContainer +import spock.lang.Retry +import spock.lang.Shared + +/** + * Tests for Redis queues. + */ +@Retry(delay = 500) +@MicronautTest(environments = QUEUE_SPEC_ENV_NAME) +class RedisQueuesSpec extends AbstractQueuesSpec implements TestPropertyProvider { + + @Shared static GenericContainer redis = new GenericContainer('redis:3-alpine').withExposedPorts(6379) + + static { + redis.start() + } + + @SuppressWarnings('GetterMethodCouldBeProperty') + Class getExpectedImplementation() { return RedisQueues } + + @Override + @SuppressWarnings('GetterMethodCouldBeProperty') + String getName() { return 'redis' } + + @Override + Map getProperties() { + return [ + 'redis.uri' : "redis://$redis.host:${redis.getMappedPort(6379)}", + 'worker.jobs.send-words-job-listen.enabled': 'true', + 'worker.jobs.send-words-job-hello.enabled' : 'true', + ] + } + +}