From 381793558cedc254af7e7a5e159da3cd164bbebc Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Wed, 20 Dec 2023 14:01:45 +0100 Subject: [PATCH 1/3] Bump to Vert.x 4.5.1 --- pom.xml | 2 +- .../messaging/kafka/impl/KafkaSource.java | 26 +++++++------------ .../commit/RedisCheckpointStateStore.java | 4 +-- .../pulsar/PulsarIncomingChannel.java | 20 ++++++-------- 4 files changed, 19 insertions(+), 33 deletions(-) diff --git a/pom.xml b/pom.xml index 5267913255..d0a9809e28 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ 11 11 - 4.4.5 + 4.5.1 2.2.21 1.1.0 5.1.2.Final diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index d9736b2795..06c6d35dd1 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -7,14 +7,7 @@ import static io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.findMatchingListener; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -32,13 +25,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.health.HealthReport; -import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordBatch; -import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; -import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; -import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; -import io.smallrye.reactive.messaging.kafka.KafkaRecord; +import io.smallrye.reactive.messaging.kafka.*; import io.smallrye.reactive.messaging.kafka.commit.ContextHolder; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue; @@ -47,7 +34,7 @@ import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth; import io.smallrye.reactive.messaging.kafka.tracing.KafkaOpenTelemetryInstrumenter; import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace; -import io.vertx.core.impl.EventLoopContext; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -74,7 +61,12 @@ public class KafkaSource { private final Instance> deserializationFailureHandlers; private final Instance consumerRebalanceListeners; private final ReactiveKafkaConsumer client; - private final EventLoopContext context; + + /** + * This field stores the event loop context. + * Using {@code ContextInternal} to distinguish it from the {@code Context} used by the user. + */ + private final ContextInternal context; private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java index 16ab5dbd5d..9f3d7f1bc6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java @@ -37,7 +37,6 @@ import io.vertx.mutiny.redis.client.Request; import io.vertx.mutiny.redis.client.Response; import io.vertx.redis.client.RedisOptions; -import io.vertx.redis.client.RedisOptionsConverter; public class RedisCheckpointStateStore implements CheckpointStateStore { @@ -83,8 +82,7 @@ public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, V JsonObject entries = JsonHelper.asJsonObject(config.config(), KafkaCommitHandler.Strategy.CHECKPOINT + "." + STATE_STORE_NAME + "."); - RedisOptions options = new RedisOptions(); - RedisOptionsConverter.fromJson(entries, options); + RedisOptions options = new RedisOptions(entries); Redis redis = Redis.createClient(vertx, options); ProcessingStateCodec stateCodec = CDIUtils.getInstanceById(stateCodecFactory, config.getChannel(), () -> { diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java index 55fbd2e018..ee946d917a 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java @@ -10,16 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import org.apache.pulsar.client.api.BatchReceivePolicy; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.DeadLetterPolicy; -import org.apache.pulsar.client.api.KeySharedPolicy; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.RedeliveryBackoff; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -40,7 +31,7 @@ import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace; import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapGetter; import io.smallrye.reactive.messaging.tracing.TracingUtils; -import io.vertx.core.impl.EventLoopContext; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -51,7 +42,12 @@ public class PulsarIncomingChannel { private final String channel; private final PulsarAckHandler ackHandler; private final PulsarFailureHandler failureHandler; - private final EventLoopContext context; + + /** + * This field captures the event loop context. + * Using {@code ContextInternal} to distinguish it from {@code io.vertx.core.Context}. + */ + private final ContextInternal context; private final AtomicBoolean closed = new AtomicBoolean(false); private final List failures = new ArrayList<>(); From 6be6aa9bbaf129cd32a2a0515161c79049c5a3fb Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Wed, 20 Dec 2023 19:00:08 +0100 Subject: [PATCH 2/3] Do not execute on the context after shutdown --- .../messaging/mqtt/session/impl/MqttClientSessionImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java index c1a202f66c..0f1f038d21 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; import io.netty.handler.codec.mqtt.MqttQoS; @@ -97,7 +98,11 @@ public Future start() { @Override public Future stop() { Promise promise = Promise.promise(); - this.vertx.runOnContext(x -> doStop(promise)); + try { + this.vertx.runOnContext(x -> doStop(promise)); + } catch (RejectedExecutionException e) { + // Vert.x has been shutdown, ignore it. + } return promise.future(); } From 80162bf675cfbf64aa5842d64470a98c40beb333 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 2 Jan 2024 10:37:57 +0100 Subject: [PATCH 3/3] Update Vert.x Mutiny Binding to version 3.8.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d0a9809e28..1ccdf00448 100644 --- a/pom.xml +++ b/pom.xml @@ -93,7 +93,7 @@ 1.31.0-alpha 1.21.0-alpha - 3.6.0 + 3.8.0 3.0.0 1.0.0