Skip to content

Commit

Permalink
Bump to Vert.x 4.5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Dec 20, 2023
1 parent a303391 commit 1e4b505
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>

<vertx.version>4.4.5</vertx.version>
<vertx.version>4.5.1</vertx.version>
<rxjava.version>2.2.21</rxjava.version>
<cloudevent.version>1.1.0</cloudevent.version>
<weld.version>5.1.2.Final</weld.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,7 @@
import static io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.findMatchingListener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -32,13 +25,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.*;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
Expand All @@ -47,7 +34,7 @@
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
import io.smallrye.reactive.messaging.kafka.tracing.KafkaOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;

Expand All @@ -74,7 +61,12 @@ public class KafkaSource<K, V> {
private final Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers;
private final Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners;
private final ReactiveKafkaConsumer<K, V> client;
private final EventLoopContext context;

/**
* This field stores the event loop context.
* Using {@code ContextInternal} to distinguish it from the {@code Context} used by the user.
*/
private final ContextInternal context;

private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.vertx.mutiny.redis.client.Request;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.RedisOptionsConverter;

public class RedisCheckpointStateStore implements CheckpointStateStore {

Expand Down Expand Up @@ -83,8 +82,7 @@ public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, V

JsonObject entries = JsonHelper.asJsonObject(config.config(),
KafkaCommitHandler.Strategy.CHECKPOINT + "." + STATE_STORE_NAME + ".");
RedisOptions options = new RedisOptions();
RedisOptionsConverter.fromJson(entries, options);
RedisOptions options = new RedisOptions(entries);
Redis redis = Redis.createClient(vertx, options);

ProcessingStateCodec stateCodec = CDIUtils.getInstanceById(stateCodecFactory, config.getChannel(), () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand All @@ -40,7 +31,7 @@
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapGetter;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;

Expand All @@ -51,7 +42,12 @@ public class PulsarIncomingChannel<T> {
private final String channel;
private final PulsarAckHandler ackHandler;
private final PulsarFailureHandler failureHandler;
private final EventLoopContext context;

/**
* This field captures the event loop context.
* Using {@code ContextInternal} to distinguish it from {@code io.vertx.core.Context}.
*/
private final ContextInternal context;
private final AtomicBoolean closed = new AtomicBoolean(false);

private final List<Throwable> failures = new ArrayList<>();
Expand Down

0 comments on commit 1e4b505

Please sign in to comment.