diff --git a/pom.xml b/pom.xml index 085e8aacdd..06a3fbc38a 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ 11 11 - 4.4.5 + 4.5.1 2.2.21 1.1.0 5.1.2.Final diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index d9736b2795..06c6d35dd1 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -7,14 +7,7 @@ import static io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.findMatchingListener; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -32,13 +25,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.health.HealthReport; -import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordBatch; -import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; -import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; -import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; -import io.smallrye.reactive.messaging.kafka.KafkaRecord; +import io.smallrye.reactive.messaging.kafka.*; import io.smallrye.reactive.messaging.kafka.commit.ContextHolder; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue; @@ -47,7 +34,7 @@ import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth; import io.smallrye.reactive.messaging.kafka.tracing.KafkaOpenTelemetryInstrumenter; import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace; -import io.vertx.core.impl.EventLoopContext; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -74,7 +61,12 @@ public class KafkaSource { private final Instance> deserializationFailureHandlers; private final Instance consumerRebalanceListeners; private final ReactiveKafkaConsumer client; - private final EventLoopContext context; + + /** + * This field stores the event loop context. + * Using {@code ContextInternal} to distinguish it from the {@code Context} used by the user. + */ + private final ContextInternal context; private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java index 16ab5dbd5d..9f3d7f1bc6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStore.java @@ -37,7 +37,6 @@ import io.vertx.mutiny.redis.client.Request; import io.vertx.mutiny.redis.client.Response; import io.vertx.redis.client.RedisOptions; -import io.vertx.redis.client.RedisOptionsConverter; public class RedisCheckpointStateStore implements CheckpointStateStore { @@ -83,8 +82,7 @@ public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, V JsonObject entries = JsonHelper.asJsonObject(config.config(), KafkaCommitHandler.Strategy.CHECKPOINT + "." + STATE_STORE_NAME + "."); - RedisOptions options = new RedisOptions(); - RedisOptionsConverter.fromJson(entries, options); + RedisOptions options = new RedisOptions(entries); Redis redis = Redis.createClient(vertx, options); ProcessingStateCodec stateCodec = CDIUtils.getInstanceById(stateCodecFactory, config.getChannel(), () -> { diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java index 55fbd2e018..ee946d917a 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java @@ -10,16 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import org.apache.pulsar.client.api.BatchReceivePolicy; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.DeadLetterPolicy; -import org.apache.pulsar.client.api.KeySharedPolicy; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.RedeliveryBackoff; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -40,7 +31,7 @@ import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace; import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapGetter; import io.smallrye.reactive.messaging.tracing.TracingUtils; -import io.vertx.core.impl.EventLoopContext; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -51,7 +42,12 @@ public class PulsarIncomingChannel { private final String channel; private final PulsarAckHandler ackHandler; private final PulsarFailureHandler failureHandler; - private final EventLoopContext context; + + /** + * This field captures the event loop context. + * Using {@code ContextInternal} to distinguish it from {@code io.vertx.core.Context}. + */ + private final ContextInternal context; private final AtomicBoolean closed = new AtomicBoolean(false); private final List failures = new ArrayList<>();