From 85e8b9a7202608b86c2ab7e3c7e6615894c363b4 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 5 Dec 2023 20:44:21 +0100 Subject: [PATCH 1/2] Removed deprecated Kafka metadata --- smallrye-reactive-messaging-kafka/revapi.json | 15 + .../messaging/kafka/IncomingKafkaRecord.java | 11 +- .../kafka/IncomingKafkaRecordMetadata.java | 20 - .../messaging/kafka/KafkaMessageMetadata.java | 8 - .../messaging/kafka/OutgoingKafkaRecord.java | 40 +- .../kafka/OutgoingKafkaRecordMetadata.java | 69 -- .../messaging/kafka/impl/KafkaSink.java | 31 +- .../messaging/kafka/KafkaRecordTest.java | 3 - .../KafkaSinkWithLegacyMetadataTest.java | 541 --------------- .../messaging/kafka/KafkaSourceTest.java | 7 +- .../KafkaSourceWithLegacyMetadataTest.java | 646 ------------------ .../kafka/LegacyMetadataTestUtils.java | 40 -- .../kafka/MetadataPropagationTest.java | 6 +- .../messaging/kafka/MultiTopicsTest.java | 10 +- .../ProducerRecordWithLegacyMetadataTest.java | 129 ---- .../kafka/commit/CommitStrategiesTest.java | 5 +- .../DeprecatedCommitStrategiesTest.java | 2 - .../messaging/kafka/commit/RebalanceTest.java | 2 +- .../KeyValueFromRecordExtractorTest.java | 2 +- ...eyValueMessageFromRecordExtractorTest.java | 2 +- .../kafka/converters/RecordConverterTest.java | 30 - .../KafkaPriceMessageConsumer.java | 2 - 22 files changed, 46 insertions(+), 1575 deletions(-) delete mode 100644 smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordMetadata.java delete mode 100644 smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaMessageMetadata.java delete mode 100644 smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecordMetadata.java delete mode 100644 smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java delete mode 100644 smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java delete mode 100644 smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/LegacyMetadataTestUtils.java delete mode 100644 smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java diff --git a/smallrye-reactive-messaging-kafka/revapi.json b/smallrye-reactive-messaging-kafka/revapi.json index d651f0be6f..6a90c29b1a 100644 --- a/smallrye-reactive-messaging-kafka/revapi.json +++ b/smallrye-reactive-messaging-kafka/revapi.json @@ -114,6 +114,21 @@ "code": "java.method.removed", "old": "method java.util.function.Function> io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecord::getNack()", "justification": "Added Message metadata propagation" + }, + { + "code": "java.class.removed", + "old": "class io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata", + "justification": "Removed deprecated IncomingKafkaRecordMetadata" + }, + { + "code": "java.class.removed", + "old": "interface io.smallrye.reactive.messaging.kafka.KafkaMessageMetadata", + "justification": "Removed deprecated KafkaMessageMetadata" + }, + { + "code": "java.class.removed", + "old": "class io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata", + "justification": "Removed deprecated OutgoingKafkaRecordMetadata" } ] } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java index 90353f10f7..6425943ade 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java @@ -11,6 +11,7 @@ import org.eclipse.microprofile.reactive.messaging.Metadata; import io.smallrye.reactive.messaging.ce.CloudEventMetadata; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper; @@ -20,13 +21,11 @@ public class IncomingKafkaRecord implements KafkaRecord, MetadataInjectableMessage { private Metadata metadata; - // TODO add as a normal import once we have removed IncomingKafkaRecordMetadata in this package - private final io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata kafkaMetadata; + private final IncomingKafkaRecordMetadata kafkaMetadata; private final KafkaCommitHandler commitHandler; private final KafkaFailureHandler onNack; private final T payload; - @SuppressWarnings("deprecation") public IncomingKafkaRecord(ConsumerRecord record, String channel, int index, @@ -35,14 +34,10 @@ public IncomingKafkaRecord(ConsumerRecord record, boolean cloudEventEnabled, boolean tracingEnabled) { this.commitHandler = commitHandler; - this.kafkaMetadata = new io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<>(record, channel, index); - // TODO remove this duplication once we have removed IncomingKafkaRecordMetadata from this package - // Duplicate the metadata so old and new copies can both be found - IncomingKafkaRecordMetadata deprecatedKafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index); + this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index); ArrayList meta = new ArrayList<>(); meta.add(this.kafkaMetadata); - meta.add(deprecatedKafkaMetadata); T payload = null; boolean payloadSet = false; if (cloudEventEnabled) { diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordMetadata.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordMetadata.java deleted file mode 100644 index 41e0ac70bd..0000000000 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordMetadata.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.smallrye.reactive.messaging.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -/** - * @deprecated use {@link io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata} instead - */ -@Deprecated -public class IncomingKafkaRecordMetadata - extends io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata implements KafkaMessageMetadata { - - public IncomingKafkaRecordMetadata(ConsumerRecord record, String channelName, int index) { - super(record, channelName, index); - } - - public IncomingKafkaRecordMetadata(ConsumerRecord record, String channelName) { - super(record, channelName, -1); - } - -} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaMessageMetadata.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaMessageMetadata.java deleted file mode 100644 index bf2eb8a432..0000000000 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaMessageMetadata.java +++ /dev/null @@ -1,8 +0,0 @@ -package io.smallrye.reactive.messaging.kafka; - -/** - * @deprecated use {@link io.smallrye.reactive.messaging.kafka.api.KafkaMessageMetadata} instead - */ -@Deprecated -public interface KafkaMessageMetadata extends io.smallrye.reactive.messaging.kafka.api.KafkaMessageMetadata { -} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecord.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecord.java index 5dae9a6ca1..f0cb3792a3 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecord.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecord.java @@ -2,7 +2,6 @@ import java.nio.charset.Charset; import java.time.Instant; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; @@ -15,63 +14,46 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; +import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; + public class OutgoingKafkaRecord implements KafkaRecord { private final T value; private final Function> ack; private final BiFunction> nack; private final Metadata metadata; - // TODO Use a normal import once OutgoingKafkaRecordMetadata in this package has been removed - private final io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata kafkaMetadata; + private final OutgoingKafkaRecordMetadata kafkaMetadata; @SuppressWarnings("deprecation") public OutgoingKafkaRecord(String topic, K key, T value, Instant timestamp, int partition, Headers headers, Function> ack, BiFunction> nack, Metadata existingMetadata) { - kafkaMetadata = io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata. builder() + kafkaMetadata = OutgoingKafkaRecordMetadata. builder() .withTopic(topic) .withKey(key) .withTimestamp(timestamp) .withPartition(partition) .withHeaders(headers) .build(); - OutgoingKafkaRecordMetadata deprecatedMetadata = new OutgoingKafkaRecordMetadata<>(topic, key, partition, timestamp, - headers); - Metadata metadata; if (existingMetadata != null) { - metadata = Metadata.from(existingMetadata).with(kafkaMetadata); + this.metadata = Metadata.from(existingMetadata).with(kafkaMetadata); } else { - metadata = Metadata.of(kafkaMetadata); + this.metadata = Metadata.of(kafkaMetadata); } - // Add the deprecated metadata while the two exist side by side - this.metadata = Metadata.from(metadata).with(deprecatedMetadata); this.value = value; this.ack = ack; this.nack = nack; } - @SuppressWarnings({ "unchecked", "deprecation", "rawtypes" }) + @SuppressWarnings({ "unchecked" }) public static OutgoingKafkaRecord from(Message message) { - // TODO Use a normal import once we've removed the legacy version of OutgoingKafkaRecordMetadata in this package - // Also this block should work to obtain the metadata once we've removed the legacy version - // io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata md = - // message.getMetadata(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.class) - // .orElse(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.builder().build()); - - Optional md = message - .getMetadata(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.class); - if (!md.isPresent()) { - md = message.getMetadata(OutgoingKafkaRecordMetadata.class); - } - io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata kafkaMetadata = md.orElse( - io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.builder().build()); - // TODO - delete the above once we remove the legacy metadata - + OutgoingKafkaRecordMetadata kafkaMetadata = message.getMetadata(OutgoingKafkaRecordMetadata.class) + .orElse(OutgoingKafkaRecordMetadata.builder().build()); return new OutgoingKafkaRecord<>(kafkaMetadata.getTopic(), kafkaMetadata.getKey(), message.getPayload(), - kafkaMetadata.getTimestamp(), kafkaMetadata.getPartition(), - kafkaMetadata.getHeaders(), message.getAckWithMetadata(), message.getNackWithMetadata(), message.getMetadata()); + kafkaMetadata.getTimestamp(), kafkaMetadata.getPartition(), kafkaMetadata.getHeaders(), + message.getAckWithMetadata(), message.getNackWithMetadata(), message.getMetadata()); } @Override diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecordMetadata.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecordMetadata.java deleted file mode 100644 index 58dca54324..0000000000 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/OutgoingKafkaRecordMetadata.java +++ /dev/null @@ -1,69 +0,0 @@ -package io.smallrye.reactive.messaging.kafka; - -import java.time.Instant; -import java.util.List; - -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; - -/** - * @deprecated use {@link io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata} instead - */ -@Deprecated -public class OutgoingKafkaRecordMetadata extends io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata - implements KafkaMessageMetadata { - - public static OutgoingKafkaRecordMetadataBuilder builder() { - return new OutgoingKafkaRecordMetadataBuilder<>(); - } - - public OutgoingKafkaRecordMetadata(String topic, K key, int partition, Instant timestamp, - Headers headers) { - super(topic, key, partition, timestamp, headers); - } - - public static final class OutgoingKafkaRecordMetadataBuilder - extends io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.OutgoingKafkaRecordMetadataBuilder { - - @Override - public OutgoingKafkaRecordMetadataBuilder withTopic(String topic) { - super.withTopic(topic); - return this; - } - - @Override - public OutgoingKafkaRecordMetadataBuilder withKey(K recordKey) { - super.withKey(recordKey); - return this; - } - - @Override - public OutgoingKafkaRecordMetadataBuilder withPartition(int partition) { - super.withPartition(partition); - return this; - } - - @Override - public OutgoingKafkaRecordMetadataBuilder withTimestamp(Instant timestamp) { - super.withTimestamp(timestamp); - return this; - } - - @Override - public OutgoingKafkaRecordMetadataBuilder withHeaders(Headers headers) { - super.withHeaders(headers); - return this; - } - - @Override - public OutgoingKafkaRecordMetadataBuilder withHeaders(List headers) { - super.withHeaders(headers); - return this; - } - - public OutgoingKafkaRecordMetadata build() { - return new OutgoingKafkaRecordMetadata<>( - getTopic(), getRecordKey(), getPartition(), getTimestamp(), getHeaders()); - } - } -} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java index 12b659e238..3cb382dff0 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java @@ -8,7 +8,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Flow; import java.util.function.Function; @@ -176,15 +175,16 @@ private synchronized void reportFailure(Throwable failure) { private Function, Uni> writeMessageToKafka() { return message -> { try { - Optional> om = getOutgoingKafkaRecordMetadata(message); - OutgoingKafkaRecordMetadata outgoingMetadata = om.orElse(null); + OutgoingKafkaRecordMetadata outgoingMetadata = message.getMetadata(OutgoingKafkaRecordMetadata.class) + .orElse(null); String actualTopic = outgoingMetadata == null || outgoingMetadata.getTopic() == null ? this.topic : outgoingMetadata.getTopic(); ProducerRecord record; OutgoingCloudEventMetadata ceMetadata = message.getMetadata(OutgoingCloudEventMetadata.class) .orElse(null); - IncomingKafkaRecordMetadata incomingMetadata = getIncomingKafkaRecordMetadata(message).orElse(null); + IncomingKafkaRecordMetadata incomingMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class) + .orElse(null); if (message.getPayload() instanceof ProducerRecord) { record = (ProducerRecord) message.getPayload(); @@ -254,29 +254,6 @@ private boolean isRecoverable(Throwable f) { return !NOT_RECOVERABLE.contains(f.getClass()); } - @SuppressWarnings("deprecation") - private Optional> getOutgoingKafkaRecordMetadata(Message message) { - Optional> metadata = message.getMetadata(OutgoingKafkaRecordMetadata.class) - .map(x -> (OutgoingKafkaRecordMetadata) x); - if (metadata.isPresent()) { - return metadata; - } - metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata.class) - .map(x -> (OutgoingKafkaRecordMetadata) x); - return metadata; - } - - private Optional> getIncomingKafkaRecordMetadata(Message message) { - Optional> metadata = message.getMetadata(IncomingKafkaRecordMetadata.class) - .map(x -> (IncomingKafkaRecordMetadata) x); - if (metadata.isPresent()) { - return metadata; - } - metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata.class) - .map(x -> (IncomingKafkaRecordMetadata) x); - return metadata; - } - @SuppressWarnings("rawtypes") private ProducerRecord getProducerRecord(Message message, OutgoingKafkaRecordMetadata om, IncomingKafkaRecordMetadata im, String actualTopic) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaRecordTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaRecordTest.java index 8f790e5fd3..e59764ce0c 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaRecordTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaRecordTest.java @@ -33,7 +33,6 @@ public void testCreationOfKafkaRecordFromKeyAndValue() { assertThat(metadata.getTopic()).isNull(); assertThat(metadata.getTimestamp()).isNull(); assertThat(metadata.getHeaders()).isEmpty(); - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message); } @Test @@ -79,7 +78,6 @@ public void testCreationOfKafkaRecordFromKeyAndValueAndTopic() { assertThat(metadata.getTopic()).isEqualTo("topic"); assertThat(metadata.getTimestamp()).isNull(); assertThat(metadata.getHeaders()).isEmpty(); - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message); } @Test @@ -100,7 +98,6 @@ public void testCreationOfKafkaRecordWithEverything() { assertThat(metadata.getTopic()).isEqualTo("topic"); assertThat(metadata.getTimestamp()).isEqualTo(timestamp); assertThat(metadata.getHeaders()).isEmpty(); - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message); } @Test diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java deleted file mode 100644 index 152b91913d..0000000000 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java +++ /dev/null @@ -1,541 +0,0 @@ -package io.smallrye.reactive.messaging.kafka; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Flow; - -import jakarta.enterprise.context.ApplicationScoped; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; - -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.health.HealthReport; -import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; -import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; -import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; -import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask; -import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion; -import io.smallrye.reactive.messaging.kafka.impl.KafkaSink; -import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; - -/** - * Duplicate of {@link KafkaSinkTest} - delete once we remove the legacy - * {@link io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata} - *

- * Tests not using metadata have been removed - */ -public class KafkaSinkWithLegacyMetadataTest extends KafkaCompanionTestBase { - - private KafkaSink sink; - - @AfterEach - public void cleanup() { - if (sink != null) { - sink.closeQuietly(); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testSinkUsingInteger() { - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 10, Duration.ofSeconds(10)); - - MapBasedConfig config = getBaseConfig() - .with("topic", topic) - .with("value.serializer", IntegerSerializer.class.getName()) - .with("partition", 0) - .with("channel-name", "testSinkUsingInteger"); - KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); - - Flow.Subscriber> subscriber = sink.getSink(); - Multi.createFrom().range(0, 10) - .map(Message::of) - .subscribe((Flow.Subscriber>) subscriber); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - } - - @Test - @SuppressWarnings("unchecked") - public void testSinkUsingIntegerAndChannelName() { - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 10, Duration.ofSeconds(10)); - - MapBasedConfig config = getBaseConfig() - .with("channel-name", topic) - .with("value.serializer", IntegerSerializer.class.getName()) - .with("partition", 0); - KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); - - Flow.Subscriber> subscriber = sink.getSink(); - Multi.createFrom().range(0, 10) - .map(Message::of) - .subscribe((Flow.Subscriber>) subscriber); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - } - - @Test - @SuppressWarnings("unchecked") - public void testSinkUsingString() { - ConsumerTask consumed = companion.consumeStrings().fromTopics(topic, 10, Duration.ofSeconds(10)); - - MapBasedConfig config = getBaseConfig() - .with("topic", topic) - .with("value.serializer", StringSerializer.class.getName()) - .with("partition", 0) - .with("channel-name", "testSinkUsingString"); - KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); - - Flow.Subscriber> subscriber = sink.getSink(); - Multi.createFrom().range(0, 10) - .map(i -> Integer.toString(i)) - .map(Message::of) - .subscribe((Flow.Subscriber>) subscriber); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - } - - private MapBasedConfig getBaseConfig() { - return kafkaConfig() - .put("key.serializer", StringSerializer.class.getName()) - .put("acks", "1"); - } - - private MapBasedConfig getKafkaSinkConfigForProducingBean() { - KafkaMapBasedConfig config = kafkaConfig("mp.messaging.outgoing.output") - .put("topic", topic) - .put("value.serializer", IntegerSerializer.class.getName()); - return config; - } - - private MapBasedConfig getKafkaSinkConfigForMessageProducingBean() { - KafkaMapBasedConfig config = kafkaConfig("mp.messaging.outgoing.output-2") - .put("value.serializer", IntegerSerializer.class.getName()) - .put("topic", topic); - return config; - } - - private KafkaMapBasedConfig getKafkaSinkConfigForRecordProducingBean(String t) { - KafkaMapBasedConfig config = kafkaConfig("mp.messaging.outgoing.output-record"); - config.put("key.serializer", IntegerSerializer.class.getName()); - config.put("value.serializer", StringSerializer.class.getName()); - if (t != null) { - config.put("topic", t); - } - - return config; - } - - private KafkaMapBasedConfig getKafkaSinkConfigWithMultipleUpstreams(String t) { - KafkaMapBasedConfig config = kafkaConfig("mp.messaging.outgoing.data"); - config.put("key.serializer", StringSerializer.class.getName()); - config.put("value.serializer", IntegerSerializer.class.getName()); - config.put("merge", true); - if (t != null) { - config.put("topic", t); - } - - return config; - } - - @Test - public void testABeanProducingMessagesSentToKafka() { - runApplication(getKafkaSinkConfigForProducingBean(), ProducingBean.class); - - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 10, Duration.ofSeconds(10)); - - await().until(this::isReady); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - - HealthReport liveness = getHealth().getLiveness(); - HealthReport readiness = getHealth().getReadiness(); - - assertThat(liveness.isOk()).isTrue(); - assertThat(readiness.isOk()).isTrue(); - assertThat(liveness.getChannels()).hasSize(1); - assertThat(readiness.getChannels()).hasSize(1); - assertThat(liveness.getChannels().get(0).getChannel()).isEqualTo("output"); - assertThat(readiness.getChannels().get(0).getChannel()).isEqualTo("output"); - - KafkaClientService service = get(KafkaClientService.class); - assertThat(service.getProducer("output")).isNotNull(); - assertThat(service.getProducer("missing")).isNull(); - assertThatThrownBy(() -> service.getProducer(null)).isInstanceOf(NullPointerException.class); - } - - @Test - public void testABeanProducingKafkaMessagesSentToKafka() { - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 10, Duration.ofSeconds(10)); - - runApplication(getKafkaSinkConfigForMessageProducingBean(), ProducingKafkaMessageBean.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::key) - .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); - assertThat(consumed.getRecords()) - .extracting(cr -> KafkaCompanion.getHeader(cr.headers(), "count")) - .containsExactly("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); - } - - @Test - public void testABeanProducingKafkaMessagesSentToKafkaUsingAdminHealthCheck() { - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 10, Duration.ofSeconds(10)); - - runApplication(getKafkaSinkConfigForMessageProducingBean() - .with("health-readiness-topic-verification", true), - ProducingKafkaMessageBean.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::key) - .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); - assertThat(consumed.getRecords()) - .extracting(cr -> KafkaCompanion.getHeader(cr.headers(), "count")) - .containsExactly("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testInvalidPayloadType() { - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 4, Duration.ofSeconds(10)); - - MapBasedConfig config = getBaseConfig() - .with("topic", topic) - .with("value.serializer", IntegerSerializer.class.getName()) - .with("partition", 0) - .with("max-inflight-messages", 1L) - .with("channel-name", "my-channel") - .with("retries", 0L); // disable retry. - KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - CountKafkaCdiEvents testCdiEvents = new CountKafkaCdiEvents(); - sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); - - await().until(() -> { - HealthReport.HealthReportBuilder builder = HealthReport.builder(); - sink.isReady(builder); - return builder.build().isOk(); - }); - - List acked = new CopyOnWriteArrayList<>(); - List nacked = new CopyOnWriteArrayList<>(); - Flow.Subscriber subscriber = sink.getSink(); - Multi.createFrom().range(0, 6) - .map(i -> { - if (i == 3 || i == 5) { - return Integer.toString(i); - } - return i; - }) - .map(i -> Message.of(i, () -> { - acked.add(i); - return CompletableFuture.completedFuture(null); - }, t -> { - nacked.add(i); - return CompletableFuture.completedFuture(null); - })) - .subscribe(subscriber); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(4); // 3 and 5 are ignored. - - await().until(() -> nacked.size() >= 2); - assertThat(acked).containsExactly(0, 1, 2, 4); - assertThat(nacked).contains("3", "5"); - - assertThat(testCdiEvents.firedConsumerEvents.sum()).isEqualTo(0); - assertThat(testCdiEvents.firedProducerEvents.sum()).isEqualTo(1); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testInvalidTypeWithDefaultInflightMessages() { - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 10, Duration.ofSeconds(10)); - - MapBasedConfig config = getBaseConfig() - .with("topic", topic) - .with("value.serializer", IntegerSerializer.class.getName()) - .with("partition", 0) - .with("retries", 0L) - .with("channel-name", "testInvalidTypeWithDefaultInflightMessages"); - KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); - - Flow.Subscriber subscriber = sink.getSink(); - Multi.createFrom().range(0, 5) - .map(i -> { - if (i == 3 || i == 5) { - return Integer.toString(i); - } - return i; - }) - .map(Message::of) - .subscribe(subscriber); - - await().until(() -> consumed.count() >= 3); - // Default inflight is 5 - // 1, 2, 3, 4, 5 are sent at the same time. - // As 3 fails, the stream is stopped, but, 1, 2, and 4 are already sent and potentially 6 - assertThat(consumed.count()).isGreaterThanOrEqualTo(3); - } - - @Test - public void testABeanProducingMessagesUsingHeadersSentToKafka() { - ConsumerTask consumed = companion.consumeIntegers().fromTopics(topic, 10, Duration.ofSeconds(10)); - - runApplication(getKafkaSinkConfigForMessageProducingBean(), ProducingMessageWithHeaderBean.class); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::key) - .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); - assertThat(consumed.getRecords()) - .extracting(cr -> KafkaCompanion.getHeader(cr.headers(), "count")) - .containsExactly("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); - } - - @Test - public void testABeanProducingRecords() { - ConsumerTask consumed = companion.consume(Integer.class, String.class) - .fromTopics(topic, 10); - - runApplication(getKafkaSinkConfigForRecordProducingBean(topic), BeanProducingKafkaRecord.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::key) - .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::value) - .containsExactly("value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7", "value-8", - "value-9", "value-10"); - } - - @Test - public void testABeanProducingRecordsWithNullKey() { - ConsumerTask consumed = companion.consume(Integer.class, String.class) - .fromTopics(topic, 10); - - runApplication(getKafkaSinkConfigForRecordProducingBean(topic), BeanProducingKafkaRecordNoKey.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::key) - .containsExactly(null, null, null, null, null, null, null, null, null, null); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::value) - .containsExactly("value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7", "value-8", - "value-9", "value-10"); - } - - @Test - public void testABeanProducingRecordsNoValue() { - ConsumerTask consumed = companion.consume(Integer.class, String.class) - .fromTopics(topic, 10); - - runApplication(getKafkaSinkConfigForRecordProducingBean(topic), BeanProducingKafkaRecordNoValue.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::key) - .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::value) - .containsExactly(null, null, null, null, null, null, null, null, null, null); - } - - @Test - public void testABeanProducingRecordsNoValueNoKey() { - ConsumerTask consumed = companion.consume(Integer.class, String.class) - .fromTopics(topic, 10); - - runApplication(getKafkaSinkConfigForRecordProducingBean(topic), BeanProducingKafkaRecordNoValueNoKey.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::key) - .containsExactly(null, null, null, null, null, null, null, null, null, null); - assertThat(consumed.getRecords()) - .extracting(ConsumerRecord::value) - .containsExactly(null, null, null, null, null, null, null, null, null, null); - } - - @Test - public void testABeanProducingRecordsAsMessageWithKeyOverridden() { - ConsumerTask consumed = companion.consume(Integer.class, String.class) - .fromTopics(topic, 10); - - runApplication(getKafkaSinkConfigForRecordProducingBean(topic), BeanProducingKafkaRecordInMessage.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consumed.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(10); - assertThat(consumed) - .extracting(ConsumerRecord::key) - .containsExactly(100, 1, 102, 3, 104, 5, 106, 7, 108, 9); - assertThat(consumed) - .extracting(ConsumerRecord::value) - .containsExactly("value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7", "value-8", - "value-9", "value-10"); - } - - @Test - public void testConnectorWithMultipleUpstreams() { - ConsumerTask consume = companion.consumeIntegers().fromTopics(topic, 20); - - KafkaMapBasedConfig config = getKafkaSinkConfigWithMultipleUpstreams(topic); - runApplication(config, BeanWithMultipleUpstreams.class); - - await().until(this::isReady); - await().until(this::isAlive); - - assertThat(consume.awaitCompletion(Duration.ofMinutes(1)).count()).isEqualTo(20); - assertThat(consume.getRecords()) - .extracting(ConsumerRecord::value) - .contains(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19); - } - - @ApplicationScoped - public static class BeanProducingKafkaRecord { - - @Incoming("data") - @Outgoing("output-record") - public Record process(int input) { - return Record.of(input, "value-" + (input + 1)); - } - - @Outgoing("data") - public Flow.Publisher source() { - return Multi.createFrom().range(0, 10); - } - - } - - @ApplicationScoped - public static class BeanProducingKafkaRecordNoKey { - - @Incoming("data") - @Outgoing("output-record") - public Record process(int input) { - return Record.of(null, "value-" + (input + 1)); - } - - @Outgoing("data") - public Flow.Publisher source() { - return Multi.createFrom().range(0, 10); - } - - } - - @ApplicationScoped - public static class BeanProducingKafkaRecordNoValue { - - @Incoming("data") - @Outgoing("output-record") - public Record process(int input) { - return Record.of(input, null); - } - - @Outgoing("data") - public Flow.Publisher source() { - return Multi.createFrom().range(0, 10); - } - - } - - @ApplicationScoped - public static class BeanProducingKafkaRecordNoValueNoKey { - - @SuppressWarnings("unused") - @Incoming("data") - @Outgoing("output-record") - public Record process(int input) { - return Record.of(null, null); - } - - @Outgoing("data") - public Flow.Publisher source() { - return Multi.createFrom().range(0, 10); - } - - } - - @ApplicationScoped - public static class BeanProducingKafkaRecordInMessage { - - @Incoming("data") - @Outgoing("output-record") - public Message> process(Message input) { - int value = input.getPayload(); - if (value % 2 != 0) { - return input.withPayload(Record.of(value, "value-" + (value + 1))); - } else { - OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata. builder() - .withKey(100 + value).build(); - return input.withPayload(Record.of(value, "value-" + (value + 1))).addMetadata(metadata); - } - } - - @Outgoing("data") - public Flow.Publisher source() { - return Multi.createFrom().range(0, 10); - } - - } - - @ApplicationScoped - public static class BeanWithMultipleUpstreams { - - @Outgoing("data") - public Flow.Publisher source() { - return Multi.createFrom().range(0, 10); - } - - @Outgoing("data") - public Flow.Publisher source2() { - return Multi.createFrom().range(10, 20) - .onItem().call(x -> Uni.createFrom().voidItem().onItem().delayIt().by(Duration.ofMillis(20))); - } - - } -} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java index 02ffa3a560..8cdfd29ab5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java @@ -38,6 +38,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.test.AssertSubscriber; import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.api.KafkaMetadataUtil; import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; @@ -530,16 +531,14 @@ public void testABeanConsumingTheKafkaMessagesWithRawMessage() { List> messages = bean.getKafkaMessages(); messages.forEach(m -> { - // TODO Import normally once the deprecateed copy in this package has gone - io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata metadata = m - .getMetadata(io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata.class) + IncomingKafkaRecordMetadata metadata = m + .getMetadata(IncomingKafkaRecordMetadata.class) .orElseThrow(() -> new AssertionError("Metadata expected")); assertThat(metadata.getTopic()).isEqualTo(topic); assertThat(metadata.getTimestamp()).isAfter(Instant.EPOCH); assertThat(metadata.getPartition()).isGreaterThan(-1); assertThat(metadata.getOffset()).isGreaterThan(-1); Assert.assertSame(metadata, KafkaMetadataUtil.readIncomingKafkaMetadata(m).get()); - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, m); }); HealthReport liveness = getHealth().getLiveness(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java deleted file mode 100644 index 1157e63083..0000000000 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java +++ /dev/null @@ -1,646 +0,0 @@ -package io.smallrye.reactive.messaging.kafka; - -import static io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension.restart; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Flow; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.UnsatisfiedResolutionException; -import jakarta.inject.Inject; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.eclipse.microprofile.reactive.messaging.Channel; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.jboss.weld.exceptions.DeploymentException; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; - -import io.smallrye.common.annotation.Identifier; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.test.AssertSubscriber; -import io.smallrye.reactive.messaging.health.HealthReport; -import io.smallrye.reactive.messaging.kafka.api.KafkaMetadataUtil; -import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; -import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; -import io.smallrye.reactive.messaging.kafka.base.SingletonInstance; -import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; -import io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit; -import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion; -import io.smallrye.reactive.messaging.kafka.companion.test.KafkaBrokerExtension; -import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop; -import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; -import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; -import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; -import io.strimzi.test.container.StrimziKafkaContainer; - -/** - * Duplicate of {@link KafkaSourceTest} - delete once we remove the legacy - * {@link io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata} - *

- * Tests not using metadata have been removed - */ -public class KafkaSourceWithLegacyMetadataTest extends KafkaCompanionTestBase { - - KafkaSource source; - KafkaConnector connector; - - @AfterEach - public void closing() { - if (source != null) { - source.closeQuietly(); - } - if (connector != null) { - connector.terminate(new Object()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testSource() { - MapBasedConfig config = newCommonConfigForSource() - .with("value.deserializer", IntegerDeserializer.class.getName()); - KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, - UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); - - List> messages = new ArrayList<>(); - source.getStream().subscribe().with(messages::add); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, "hello", i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages.size() >= 10); - assertThat(messages.stream().map(m -> ((KafkaRecord) m).getPayload()) - .collect(Collectors.toList())).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - } - - @SuppressWarnings("unchecked") - @Test - public void testSourceWithPartitions() { - MapBasedConfig config = newCommonConfigForSource() - .with("value.deserializer", IntegerDeserializer.class.getName()) - .with("partitions", 4); - - companion.topics().createAndWait(topic, 3); - KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, - UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); - - List> messages = new ArrayList<>(); - source.getStream().subscribe().with(messages::add); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 1000); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages.size() >= 1000); - - List expected = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); - // Because of partitions we cannot enforce the order. - assertThat(messages.stream().map(m -> ((KafkaRecord) m).getPayload()) - .collect(Collectors.toList())).containsExactlyInAnyOrderElementsOf(expected); - } - - @SuppressWarnings("rawtypes") - @Test - public void testSourceWithChannelName() { - MapBasedConfig config = newCommonConfigForSource() - .with("value.deserializer", IntegerDeserializer.class.getName()); - KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, - UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); - - List messages = new ArrayList<>(); - source.getStream().subscribe().with(messages::add); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages.size() >= 10); - assertThat(messages.stream().map(KafkaRecord::getPayload).collect(Collectors.toList())) - .containsExactly(0, 1, 2, 3, 4, - 5, 6, 7, 8, 9); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testBroadcast() { - MapBasedConfig config = newCommonConfigForSource() - .with("value.deserializer", IntegerDeserializer.class.getName()) - .with("broadcast", true); - - CountKafkaCdiEvents testEvents = new CountKafkaCdiEvents(); - - connector = new KafkaConnector(); - connector.executionHolder = new ExecutionHolder(vertx); - connector.configurations = UnsatisfiedInstance.instance(); - connector.consumerRebalanceListeners = UnsatisfiedInstance.instance(); - connector.commitHandlerFactories = new SingletonInstance<>("throttled", - new KafkaThrottledLatestProcessedCommit.Factory()); - connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); - connector.kafkaCDIEvents = testEvents; - connector.init(); - - Flow.Publisher> publisher = (Flow.Publisher>) connector.getPublisher(config); - - AssertSubscriber messages1 = AssertSubscriber.create(Long.MAX_VALUE); - AssertSubscriber messages2 = AssertSubscriber.create(Long.MAX_VALUE); - publisher.subscribe(messages1); - publisher.subscribe(messages2); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.getItems().size() >= 10); - await().atMost(2, TimeUnit.MINUTES).until(() -> messages2.getItems().size() >= 10); - assertThat(messages1.getItems().stream().map(KafkaRecord::getPayload).collect(Collectors.toList())) - .containsExactly(0, 1, 2, 3, 4, - 5, 6, 7, 8, 9); - assertThat(messages2.getItems().stream().map(KafkaRecord::getPayload).collect(Collectors.toList())) - .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - - assertThat(testEvents.firedConsumerEvents.sum()).isEqualTo(1); - assertThat(testEvents.firedProducerEvents.sum()).isEqualTo(0); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testBroadcastWithPartitions() { - companion.topics().create(topic, 2); - MapBasedConfig config = newCommonConfigForSource() - .with("value.deserializer", IntegerDeserializer.class.getName()) - .with("broadcast", true) - .with("partitions", 2); - - connector = new KafkaConnector(); - connector.executionHolder = new ExecutionHolder(vertx); - connector.configurations = UnsatisfiedInstance.instance(); - connector.consumerRebalanceListeners = UnsatisfiedInstance.instance(); - connector.kafkaCDIEvents = new CountKafkaCdiEvents(); - connector.commitHandlerFactories = new SingletonInstance<>("throttled", - new KafkaThrottledLatestProcessedCommit.Factory()); - connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); - connector.init(); - - Flow.Publisher> publisher = (Flow.Publisher>) connector.getPublisher(config); - - AssertSubscriber messages1 = AssertSubscriber.create(Long.MAX_VALUE); - AssertSubscriber messages2 = AssertSubscriber.create(Long.MAX_VALUE); - publisher.subscribe(messages1); - publisher.subscribe(messages2); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.getItems().size() >= 10); - await().atMost(2, TimeUnit.MINUTES).until(() -> messages2.getItems().size() >= 10); - assertThat(messages1.getItems().stream().map(KafkaRecord::getPayload).collect(Collectors.toList())) - .containsExactlyInAnyOrder(0, 1, 2, 3, 4, - 5, 6, 7, 8, 9); - assertThat(messages2.getItems().stream().map(KafkaRecord::getPayload).collect(Collectors.toList())) - .containsExactlyInAnyOrder(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - } - - @SuppressWarnings({ "rawtypes" }) - @Test - @Tag(TestTags.SLOW) - public void testRetry() { - // This test need an individual Kafka container - try (StrimziKafkaContainer kafka = KafkaBrokerExtension.createKafkaContainer()) { - kafka.start(); - await().until(kafka::isRunning); - MapBasedConfig config = newCommonConfigForSource() - .with("bootstrap.servers", kafka.getBootstrapServers()) - .with("value.deserializer", IntegerDeserializer.class.getName()) - .with("retry", true) - .with("retry-attempts", 100) - .with("retry-max-wait", 30); - - KafkaCompanion kafkaCompanion = new KafkaCompanion(kafka.getBootstrapServers()); - - KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, - UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance(), -1); - List messages1 = new ArrayList<>(); - source.getStream().subscribe().with(messages1::add); - - kafkaCompanion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.size() >= 10); - - try (@SuppressWarnings("unused") - StrimziKafkaContainer container = restart(kafka, 2)) { - kafkaCompanion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, 10), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.size() >= 20); - assertThat(messages1.size()).isGreaterThanOrEqualTo(20); - } - } - } - - // @SuppressWarnings({ "rawtypes" }) - // @Test - // public void testRecoveryAfterMissedHeartbeat() throws InterruptedException { - // MapBasedConfig config = newCommonConfigForSource() - // .with("bootstrap.servers", KafkaBrokerExtension.usage.getBootstrapServers()) - // .with("value.deserializer", IntegerDeserializer.class.getName()) - // .with(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000) - // .with(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100) - // .with("retry", true) - // .with("retry-attempts", 100) - // .with("retry-max-wait", 30); - // - // usage.setBootstrapServers(KafkaBrokerExtension.usage.getBootstrapServers()); - // - // KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - // source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, - // UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, - // UnsatisfiedInstance.instance(), -1); - // List messages1 = new ArrayList<>(); - // source.getStream().subscribe().with(messages1::add); - // - // AtomicInteger counter = new AtomicInteger(); - // usage.produceIntegers(10, null, - // () -> new ProducerRecord<>(topic, counter.getAndIncrement()))).start(); - // - // await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.size() >= 10); - // - // KafkaBrokerExtension.getProxy().setConnectionCut(true); - // Thread.sleep(6000 + 500); // session timeout + a bit more just in case. - // KafkaBrokerExtension.getProxy().setConnectionCut(false); - // - // usage.produceIntegers(10, null, - // () -> new ProducerRecord<>(topic, counter.getAndIncrement()))).start(); - // - // await().atMost(2, TimeUnit.MINUTES).until(() -> messages1.size() >= 20); - // assertThat(messages1.size()).isGreaterThanOrEqualTo(20); - // } - - private KafkaMapBasedConfig myKafkaSourceConfig(int partitions, String withConsumerRebalanceListener, - String group) { - KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.data"); - if (group != null) { - config.put("group.id", group); - } - config.put("value.deserializer", IntegerDeserializer.class.getName()); - config.put("enable.auto.commit", "false"); - config.put("auto.offset.reset", "earliest"); - config.put("topic", "legacy-data"); - if (partitions > 0) { - config.put("partitions", Integer.toString(partitions)); - config.put("topic", "legacy-data-" + partitions); - } - if (withConsumerRebalanceListener != null) { - config.put("consumer-rebalance-listener.name", withConsumerRebalanceListener); - } - - return config; - } - - private KafkaMapBasedConfig myKafkaSourceConfig(String topic, String group) { - KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.data"); - config.put("group.id", group); - config.put("value.deserializer", IntegerDeserializer.class.getName()); - config.put("enable.auto.commit", "false"); - config.put("auto.offset.reset", "earliest"); - config.put("topic", topic); - return config; - } - - private KafkaMapBasedConfig myKafkaSourceConfigWithoutAck(String suffix, boolean shorterTimeouts) { - KafkaMapBasedConfig config = kafkaConfig("mp.messaging.incoming.data"); - config.put("group.id", "my-group-starting-on-fifth-" + suffix); - config.put("value.deserializer", IntegerDeserializer.class.getName()); - config.put("enable.auto.commit", "false"); - config.put("auto.offset.reset", "earliest"); - config.put("topic", "legacy-data-starting-on-fifth-" + suffix); - if (shorterTimeouts) { - config.put("max.poll.interval.ms", "2000"); - } - - return config; - } - - @Test - public void testABeanConsumingTheKafkaMessages() { - String topic = UUID.randomUUID().toString(); - String group = UUID.randomUUID().toString(); - ConsumptionBean bean = run(myKafkaSourceConfig(topic, group)); - List list = bean.getResults(); - assertThat(list).isEmpty(); - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); - assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - - List> messages = bean.getKafkaMessages(); - messages.forEach(m -> { - assertThat(m.getTopic()).isEqualTo(topic); - assertThat(m.getTimestamp()).isAfter(Instant.EPOCH); - assertThat(m.getPartition()).isGreaterThan(-1); - }); - - HealthReport liveness = getHealth().getLiveness(); - HealthReport readiness = getHealth().getReadiness(); - assertThat(liveness.isOk()).isTrue(); - assertThat(readiness.isOk()).isTrue(); - assertThat(liveness.getChannels()).hasSize(1); - assertThat(readiness.getChannels()).hasSize(1); - assertThat(liveness.getChannels().get(0).getChannel()).isEqualTo("data"); - assertThat(readiness.getChannels().get(0).getChannel()).isEqualTo("data"); - - KafkaClientService service = get(KafkaClientService.class); - assertThat(service.getConsumer("data")).isNotNull(); - assertThat(service.getConsumer("missing")).isNull(); - assertThatThrownBy(() -> service.getConsumer(null)).isInstanceOf(NullPointerException.class); - } - - @Test - public void testABeanConsumingTheKafkaMessagesMultiThread() { - String group = UUID.randomUUID().toString(); - KafkaSourceTest.MultiThreadConsumer bean = runApplication(myKafkaSourceConfig(topic, group), - KafkaSourceTest.MultiThreadConsumer.class); - List list = bean.getItems(); - assertThat(list).isEmpty(); - bean.run(); - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 100); - - await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 100); - } - - @Test - @Tag(TestTags.FLAKY) - public void testABeanConsumingTheKafkaMessagesWithPartitions() { - companion.topics().create("legacy-data-2", 2); - ConsumptionBean bean = run( - myKafkaSourceConfig(2, ConsumptionConsumerRebalanceListener.class.getSimpleName(), null)); - List list = bean.getResults(); - assertThat(list).isEmpty(); - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>("legacy-data-2", i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); - assertThat(list).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - - List> messages = bean.getKafkaMessages(); - messages.forEach(m -> { - assertThat(m.getTopic()).isEqualTo("legacy-data-2"); - assertThat(m.getTimestamp()).isAfter(Instant.EPOCH); - assertThat(m.getPartition()).isGreaterThan(-1); - }); - - ConsumptionConsumerRebalanceListener consumptionConsumerRebalanceListener = getConsumptionConsumerRebalanceListener(); - assertThat(consumptionConsumerRebalanceListener.getAssigned().size()).isEqualTo(2); - for (int i = 0; i < 2; i++) { - TopicPartition partition = consumptionConsumerRebalanceListener.getAssigned().get(i); - assertThat(partition).isNotNull(); - assertThat(partition.topic()).isEqualTo("legacy-data-2"); - } - } - - @Test - public void testABeanConsumingWithMissingRebalanceListenerConfiguredByName() { - String group = UUID.randomUUID().toString(); - assertThatThrownBy(() -> run(myKafkaSourceConfig(0, "not exists", group))) - .isInstanceOf(DeploymentException.class) - .hasCauseInstanceOf(UnsatisfiedResolutionException.class); - } - - @Test - public void testABeanConsumingTheKafkaMessagesStartingOnFifthOffsetFromLatest() { - companion.produceIntegers() - .usingGenerator(i -> new ProducerRecord<>("legacy-data-starting-on-fifth-happy-path", i), 10) - .awaitCompletion(Duration.ofMinutes(2)); - - /* - * Will use StartFromFifthOffsetFromLatestConsumerRebalanceListener - */ - ConsumptionBeanWithoutAck bean = runWithoutAck( - myKafkaSourceConfigWithoutAck("happy-path", false)); - List list = bean.getResults(); - - await() - .atMost(2, TimeUnit.MINUTES) - .until(() -> list.size() >= 5); - - assertThat(list).containsExactly(6, 7, 8, 9, 10); - } - - @Test - public void testABeanConsumingTheKafkaMessagesStartingOnFifthOffsetFromLatestThatFailsOnTheFirstAttempt() { - companion.produceIntegers() - .usingGenerator(i -> new ProducerRecord<>("legacy-data-starting-on-fifth-fail-on-first-attempt", i), 10) - .awaitCompletion(Duration.ofMinutes(2)); - /* - * Will use StartFromFifthOffsetFromLatestConsumerRebalanceListener - */ - ConsumptionBeanWithoutAck bean = runWithoutAck( - myKafkaSourceConfigWithoutAck("fail-on-first-attempt", false)); - List list = bean.getResults(); - - await() - .atMost(2, TimeUnit.MINUTES) - .until(() -> list.size() >= 5); - - assertThat(list).containsExactly(6, 7, 8, 9, 10); - - assertThat( - getStartFromFifthOffsetFromLatestConsumerRebalanceListener( - "my-group-starting-on-fifth-fail-on-first-attempt") - .getRebalanceCount()) - .isEqualTo(1); - } - - @Test - public void testABeanConsumingTheKafkaMessagesStartingOnFifthOffsetFromLatestThatFailsUntilSecondRebalance() { - companion.produceIntegers() - .usingGenerator(i -> new ProducerRecord<>("legacy-data-starting-on-fifth-fail-until-second-rebalance", i), 10) - .awaitCompletion(Duration.ofMinutes(2)); - - /* - * Will use StartFromFifthOffsetFromLatestConsumerRebalanceListener - */ - ConsumptionBeanWithoutAck bean = runWithoutAck( - myKafkaSourceConfigWithoutAck("fail-until-second-rebalance", false)); - List list = bean.getResults(); - - await() - .atMost(2, TimeUnit.MINUTES) - .until(() -> list.size() >= 5); - - // The rebalance listener failed, no retry - assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - - // Failed, not called and there is no retry - assertThat(getStartFromFifthOffsetFromLatestConsumerRebalanceListener( - "my-group-starting-on-fifth-fail-until-second-rebalance").getRebalanceCount()).isEqualTo(0); - } - - @SuppressWarnings("unchecked") - @Test - public void testInvalidIncomingType() { - MapBasedConfig config = newCommonConfigForSource() - .with("value.deserializer", IntegerDeserializer.class.getName()); - KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, - UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); - - List> messages = new ArrayList<>(); - source.getStream().subscribe().with(messages::add); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 2); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages.size() >= 2); - assertThat(messages.stream().map(m -> ((KafkaRecord) m).getPayload()) - .collect(Collectors.toList())).containsExactly(0, 1); - - companion.produceStrings().fromRecords(new ProducerRecord<>(topic, "hello")); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 2); - - // no other message received - assertThat(messages.stream().map(m -> ((KafkaRecord) m).getPayload()) - .collect(Collectors.toList())).containsExactly(0, 1); - } - - @SuppressWarnings("unchecked") - @Test - public void testABeanConsumingTheKafkaMessagesWithRawMessage() { - String group = UUID.randomUUID().toString(); - ConsumptionBeanUsingRawMessage bean = runApplication(myKafkaSourceConfig(0, null, group), - ConsumptionBeanUsingRawMessage.class); - List list = bean.getResults(); - assertThat(list).isEmpty(); - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>("legacy-data", i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); - assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - - List> messages = bean.getKafkaMessages(); - messages.forEach(m -> { - IncomingKafkaRecordMetadata metadata = m - .getMetadata(IncomingKafkaRecordMetadata.class) - .orElseThrow(() -> new AssertionError("Metadata expected")); - assertThat(metadata.getTopic()).isEqualTo("legacy-data"); - assertThat(metadata.getTimestamp()).isAfter(Instant.EPOCH); - assertThat(metadata.getPartition()).isGreaterThan(-1); - assertThat(metadata.getOffset()).isGreaterThan(-1); - - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(KafkaMetadataUtil.readIncomingKafkaMetadata(m).get(), m); - }); - - HealthReport liveness = getHealth().getLiveness(); - HealthReport readiness = getHealth().getReadiness(); - assertThat(liveness.isOk()).isTrue(); - assertThat(readiness.isOk()).isTrue(); - assertThat(liveness.getChannels()).hasSize(1); - assertThat(readiness.getChannels()).hasSize(1); - assertThat(liveness.getChannels().get(0).getChannel()).isEqualTo("data"); - assertThat(readiness.getChannels().get(0).getChannel()).isEqualTo("data"); - } - - @SuppressWarnings("unchecked") - @Test - public void testSourceWithEmptyOptionalConfiguration() { - MapBasedConfig config = newCommonConfigForSource() - .with("value.deserializer", IntegerDeserializer.class.getName()) - .with("sasl.jaas.config", "") //optional configuration - .with("sasl.mechanism", ""); //optional configuration - KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, - UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); - - List> messages = new ArrayList<>(); - source.getStream().subscribe().with(messages::add); - - companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); - - await().atMost(2, TimeUnit.MINUTES).until(() -> messages.size() >= 10); - assertThat(messages.stream().map(m -> ((KafkaRecord) m).getPayload()) - .collect(Collectors.toList())).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - } - - private ConsumptionConsumerRebalanceListener getConsumptionConsumerRebalanceListener() { - return getBeanManager() - .createInstance() - .select(ConsumptionConsumerRebalanceListener.class) - .select(Identifier.Literal.of(ConsumptionConsumerRebalanceListener.class.getSimpleName())) - .get(); - - } - - private StartFromFifthOffsetFromLatestConsumerRebalanceListener getStartFromFifthOffsetFromLatestConsumerRebalanceListener( - String name) { - return getBeanManager() - .createInstance() - .select(StartFromFifthOffsetFromLatestConsumerRebalanceListener.class) - .select(Identifier.Literal.of(name)) - .get(); - - } - - private ConsumptionBean run(KafkaMapBasedConfig config) { - addBeans(ConsumptionBean.class, ConsumptionConsumerRebalanceListener.class); - runApplication(config); - return get(ConsumptionBean.class); - } - - private ConsumptionBeanWithoutAck runWithoutAck(KafkaMapBasedConfig config) { - addBeans(ConsumptionBeanWithoutAck.class, ConsumptionConsumerRebalanceListener.class, - StartFromFifthOffsetFromLatestConsumerRebalanceListener.class, - StartFromFifthOffsetFromLatestButFailOnFirstConsumerRebalanceListener.class, - StartFromFifthOffsetFromLatestButFailUntilSecondRebalanceConsumerRebalanceListener.class); - runApplication(config); - return get(ConsumptionBeanWithoutAck.class); - } - - @ApplicationScoped - public static class MultiThreadConsumer { - - private final List items = new CopyOnWriteArrayList<>(); - private final List threads = new CopyOnWriteArrayList<>(); - private final Random random = new Random(); - ExecutorService executor = Executors.newFixedThreadPool(10); - - @Inject - @Channel("data") - Multi messages; - - public void run() { - messages - .emitOn(executor) - .onItem().transform(s -> { - try { - Thread.sleep(random.nextInt(100)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return s * -1; - }) - .subscribe().with(it -> { - items.add(it); - threads.add(Thread.currentThread().getName()); - }); - } - - public List getItems() { - return items; - } - - public List getThreads() { - return threads; - } - } -} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/LegacyMetadataTestUtils.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/LegacyMetadataTestUtils.java deleted file mode 100644 index 7ebc87f8d9..0000000000 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/LegacyMetadataTestUtils.java +++ /dev/null @@ -1,40 +0,0 @@ -package io.smallrye.reactive.messaging.kafka; - -import org.eclipse.microprofile.reactive.messaging.Message; -import org.junit.Assert; - -/** - * Delete once we have got rid of the legacy {@link io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata} - * and {@link IncomingKafkaRecordMetadata} implementations - */ -@SuppressWarnings({ "deprecation", "OptionalGetWithoutIsPresent" }) -public class LegacyMetadataTestUtils { - - public static void tempCompareLegacyAndApiMetadata( - io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata api, Message msg) { - IncomingKafkaRecordMetadata legacy = msg - .getMetadata(IncomingKafkaRecordMetadata.class) - .get(); - tempCompareMetadata(api, legacy); - Assert.assertEquals(api.getOffset(), legacy.getOffset()); - Assert.assertEquals(api.getTimestampType(), legacy.getTimestampType()); - - } - - public static void tempCompareLegacyAndApiMetadata( - io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata api, Message msg) { - OutgoingKafkaRecordMetadata legacy = msg - .getMetadata(OutgoingKafkaRecordMetadata.class) - .get(); - tempCompareMetadata(api, legacy); - } - - private static void tempCompareMetadata(io.smallrye.reactive.messaging.kafka.api.KafkaMessageMetadata api, - KafkaMessageMetadata legacy) { - Assert.assertEquals(api.getKey(), legacy.getKey()); - Assert.assertEquals(api.getTopic(), legacy.getTopic()); - Assert.assertEquals(api.getPartition(), legacy.getPartition()); - Assert.assertEquals(api.getTimestamp(), legacy.getTimestamp()); - Assert.assertEquals(api.getHeaders(), legacy.getHeaders()); - } -} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MetadataPropagationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MetadataPropagationTest.java index b8b7e8316b..ad40cba39b 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MetadataPropagationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MetadataPropagationTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test; import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask; @@ -131,9 +132,8 @@ public void testFromKafkaToAppWithMetadata() { assertThat(bean.getMetadata()).contains(bean.getOriginal()); AtomicBoolean foundMetadata = new AtomicBoolean(false); for (Object object : bean.getMetadata()) { - // TODO Import normally once the deprecated copy in this package has gone - if (object instanceof io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata) { - io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata incomingMetadata = (io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata) object; + if (object instanceof IncomingKafkaRecordMetadata) { + IncomingKafkaRecordMetadata incomingMetadata = (IncomingKafkaRecordMetadata) object; assertThat(incomingMetadata.getKey()).isEqualTo("a-key"); assertThat(incomingMetadata.getTopic()).isEqualTo(topic); foundMetadata.compareAndSet(false, true); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java index e5bf910edb..e7a0511ed0 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java @@ -63,9 +63,7 @@ public void testWithThreeTopicsInConfiguration() { AtomicInteger top2 = new AtomicInteger(); AtomicInteger top3 = new AtomicInteger(); bean.getMessages().forEach(message -> { - // TODO Import normally once the deprecated copy in this package has gone - io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata record = message - .getMetadata(io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata.class).orElse(null); + IncomingKafkaRecordMetadata record = message.getMetadata(IncomingKafkaRecordMetadata.class).orElse(null); assertThat(record).isNotNull(); String topic = record.getTopic(); if (topic.equals(topic1)) { @@ -78,7 +76,6 @@ public void testWithThreeTopicsInConfiguration() { top3.incrementAndGet(); assertThat(message.getPayload()).isEqualTo("bonjour"); } - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(record, message); }); assertThat(top1.get()).isGreaterThanOrEqualTo(3); @@ -117,8 +114,7 @@ public void testWithOnlyTwoTopicsReceiving() { AtomicInteger top2 = new AtomicInteger(); AtomicInteger top3 = new AtomicInteger(); bean.getMessages().forEach(message -> { - io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata record = message - .getMetadata(io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata.class).orElse(null); + IncomingKafkaRecordMetadata record = message.getMetadata(IncomingKafkaRecordMetadata.class).orElse(null); assertThat(record).isNotNull(); String topic = record.getTopic(); if (topic.equals(topic1)) { @@ -130,7 +126,6 @@ public void testWithOnlyTwoTopicsReceiving() { top3.incrementAndGet(); assertThat(message.getPayload()).isEqualTo("bonjour"); } - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(record, message); }); assertThat(top1).hasValue(3); @@ -187,7 +182,6 @@ public void testWithPattern() { top3.incrementAndGet(); assertThat(message.getPayload()).isEqualTo("bonjour"); } - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(record, message); }); assertThat(top1).hasValue(3); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java deleted file mode 100644 index 69e42c3ad2..0000000000 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ProducerRecordWithLegacyMetadataTest.java +++ /dev/null @@ -1,129 +0,0 @@ -package io.smallrye.reactive.messaging.kafka; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.stream.Collectors; - -import jakarta.enterprise.context.ApplicationScoped; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.junit.jupiter.api.Timeout; -import org.junitpioneer.jupiter.RetryingTest; - -import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; -import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; -import io.smallrye.reactive.messaging.kafka.converters.ConsumerRecordConverter; - -/** - * Duplicate of {@link ProducerRecordTest} - delete once we remove the legacy - * {@link io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata} - */ -public class ProducerRecordWithLegacyMetadataTest extends KafkaCompanionTestBase { - private static final String TOPIC_NAME_BASE = "ProducerRecord-" + UUID.randomUUID() + "-"; - - @RetryingTest(3) - @Timeout(60) - public void test() { - for (int i = 0; i < 10; i++) { - companion.topics().createAndWait(TOPIC_NAME_BASE + i, 1); - } - - addBeans(ConsumerRecordConverter.class); - runApplication(config(), MyApp.class); - - MyApp bean = get(MyApp.class); - - await().until(() -> bean.received().size() >= 500); - List> messages = bean.received(); - - assertThat(messages).allSatisfy(consumerRecord -> { - assertThat(consumerRecord.key()).startsWith("key-"); - assertThat(consumerRecord.value()).startsWith("value-"); - assertThat(consumerRecord.topic()).startsWith(TOPIC_NAME_BASE); - - assertThat(consumerRecord.headers()).allSatisfy(header -> { - assertThat(header.key()).startsWith("my-header-"); - assertThat(new String(header.value(), StandardCharsets.UTF_8)).startsWith("my-header-value-"); - }); - assertThat(consumerRecord.headers()).noneSatisfy(header -> { - assertThat(header.key()).startsWith("my-other-header-"); - assertThat(new String(header.value(), StandardCharsets.UTF_8)).startsWith("my-other-header-value-"); - }); - }); - - Set topics = messages.stream() - .map(ConsumerRecord::topic) - .collect(Collectors.toSet()); - for (int i = 0; i < 10; i++) { - assertThat(topics).contains(TOPIC_NAME_BASE + i); - } - } - - private KafkaMapBasedConfig config() { - return kafkaConfig("mp.messaging.outgoing.generated-producer") - .put("topic", "nonexistent-topic") - .put("key.serializer", StringSerializer.class.getName()) - .put("value.serializer", StringSerializer.class.getName()) - - .withPrefix("mp.messaging.incoming.generated-consumer") - .put("topic", TOPIC_NAME_BASE + ".+") - .put("pattern", true) - .put("key.deserializer", StringDeserializer.class.getName()) - .put("value.deserializer", StringDeserializer.class.getName()) - .put("auto.offset.reset", "earliest"); - } - - @ApplicationScoped - public static class MyApp { - private final List> received = new CopyOnWriteArrayList<>(); - - @SuppressWarnings("deprecation") - @Outgoing("generated-producer") - public Multi>> produce() { - return Multi.createFrom().ticks().every(Duration.ofMillis(10)).map(tick -> { - int id = tick.intValue(); - int topicId = id % 10; - - Headers headersToBeUsed = new RecordHeaders() - .add("my-header-" + id, ("my-header-value-" + id).getBytes(StandardCharsets.UTF_8)); - - Headers headersToBeLost = new RecordHeaders() - .add("my-other-header-" + id, ("my-other-header-value-" + id).getBytes(StandardCharsets.UTF_8)); - - ProducerRecord record = new ProducerRecord<>(TOPIC_NAME_BASE + topicId, null, - "key-" + id, "value-" + id, headersToBeUsed); - - return Message.of(record) - .addMetadata(OutgoingKafkaRecordMetadata. builder() - .withTopic("nonexistent-topic-" + id) - .withHeaders(headersToBeLost) - .build()); - }); - } - - @Incoming("generated-consumer") - public void consume(ConsumerRecord msg) { - received.add(msg); - } - - public List> received() { - return received; - } - } -} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java index ec846dee8c..1566c7e92e 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java @@ -351,8 +351,8 @@ void testThrottledStrategyWithTooManyUnackedMessages() throws Exception { // Only ack the one from partition 0, and the 3 first items from partition 1. int count = 0; for (Message message : list) { - IncomingKafkaRecordMetadata metadata = message - .getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow(() -> new Exception("metadata expected")); + IncomingKafkaRecordMetadata metadata = message.getMetadata(IncomingKafkaRecordMetadata.class) + .orElseThrow(() -> new Exception("metadata expected")); if (metadata.getPartition() == 0) { message.ack().toCompletableFuture().join(); } else { @@ -361,7 +361,6 @@ void testThrottledStrategyWithTooManyUnackedMessages() throws Exception { count = count + 1; } } - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message); } AtomicReference report = new AtomicReference<>(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java index 2abfdbbdb1..4ccdec792e 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java @@ -41,7 +41,6 @@ import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; -import io.smallrye.reactive.messaging.kafka.LegacyMetadataTestUtils; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.base.WeldTestBase; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; @@ -379,7 +378,6 @@ void testThrottledStrategyWithTooManyUnackedMessages() throws Exception { count = count + 1; } } - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message); } AtomicReference report = new AtomicReference<>(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java index a6b6fd1074..c862fe8e2c 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java @@ -19,6 +19,7 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.kafka.*; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.base.WeldTestBase; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -93,7 +94,6 @@ void testRebalance() throws InterruptedException { list.forEach(m -> { // Only commit one partition - //noinspection deprecation if (m.getMetadata(IncomingKafkaRecordMetadata.class).map(IncomingKafkaRecordMetadata::getPartition) .orElse(-1) == 0) { m.ack().toCompletableFuture().join(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueFromRecordExtractorTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueFromRecordExtractorTest.java index e311230b78..6a3af4074d 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueFromRecordExtractorTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueFromRecordExtractorTest.java @@ -24,7 +24,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; import io.smallrye.reactive.messaging.keyed.KeyValueExtractor; import io.smallrye.reactive.messaging.keyed.Keyed; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueMessageFromRecordExtractorTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueMessageFromRecordExtractorTest.java index a9e0c0ebb5..52ed455295 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueMessageFromRecordExtractorTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/KeyValueMessageFromRecordExtractorTest.java @@ -24,7 +24,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; import io.smallrye.reactive.messaging.keyed.KeyValueExtractor; import io.smallrye.reactive.messaging.keyed.Keyed; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/RecordConverterTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/RecordConverterTest.java index e944720f9d..5f289febdd 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/RecordConverterTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/converters/RecordConverterTest.java @@ -52,36 +52,6 @@ public void testConverter() { }); } - // TODO Delete once we got rid of the legacy metadata - @SuppressWarnings({ "deprecation", "unchecked" }) - @Test - public void testConverterLegacy() { - RecordConverter converter = new RecordConverter(); - assertThat(converter.canConvert(Message.of("foo"), Record.class)).isFalse(); - - io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata metadata = mock( - io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata.class); - when(metadata.getKey()).thenReturn("key"); - Message message = Message.of("foo").addMetadata(metadata); - assertThat(converter.canConvert(message, Record.class)).isTrue(); - assertThat(converter.convert(message, Record.class)).satisfies(m -> { - assertThat(m.getPayload()).isInstanceOf(Record.class); - assertThat(((Record) m.getPayload()).key()).isEqualTo("key"); - assertThat(((Record) m.getPayload()).value()).isEqualTo("foo"); - }); - - assertThat(converter.canConvert(message, KafkaRecord.class)).isFalse(); - - when(metadata.getKey()).thenReturn(null); - message = Message.of("foo").addMetadata(metadata); - assertThat(converter.canConvert(message, Record.class)).isTrue(); - assertThat(converter.convert(message, Record.class)).satisfies(m -> { - assertThat(m.getPayload()).isInstanceOf(Record.class); - assertThat(((Record) m.getPayload()).key()).isNull(); - assertThat(((Record) m.getPayload()).value()).isEqualTo("foo"); - }); - } - @Test public void testBeanUsingConverter() { KafkaMapBasedConfig builder = kafkaConfig("mp.messaging.incoming.data"); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/documentation/KafkaPriceMessageConsumer.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/documentation/KafkaPriceMessageConsumer.java index 3988eee47d..55bd13d990 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/documentation/KafkaPriceMessageConsumer.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/documentation/KafkaPriceMessageConsumer.java @@ -10,7 +10,6 @@ import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; -import io.smallrye.reactive.messaging.kafka.LegacyMetadataTestUtils; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; @ApplicationScoped @@ -25,7 +24,6 @@ public CompletionStage consume(Message price) { list.add(price.getPayload()); Optional metadata = price.getMetadata(IncomingKafkaRecordMetadata.class); metadata.orElseThrow(() -> new IllegalArgumentException("Metadata are missing")); - LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata.get(), price); // Acknowledge the incoming message (commit the offset) return price.ack(); } From e2a37f21ac3947e698fba491eb7ee63ce3a6f83e Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 5 Dec 2023 20:52:11 +0100 Subject: [PATCH 2/2] Removed unnecessary test repetitions --- .../smallrye/reactive/messaging/kafka/MultiTopicsTest.java | 5 ++--- .../messaging/kafka/commit/BatchCommitStrategiesTest.java | 3 +-- .../messaging/kafka/commit/CommitStrategiesTest.java | 3 +-- .../reactive/messaging/kafka/commit/RebalanceTest.java | 4 ++-- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java index e7a0511ed0..5549a68a42 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/MultiTopicsTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; @@ -31,7 +30,7 @@ @SuppressWarnings("rawtypes") public class MultiTopicsTest extends KafkaCompanionTestBase { - @RepeatedTest(5) + @Test public void testWithThreeTopicsInConfiguration() { String topic1 = UUID.randomUUID().toString(); String topic2 = UUID.randomUUID().toString(); @@ -83,7 +82,7 @@ public void testWithThreeTopicsInConfiguration() { assertThat(top3.get()).isGreaterThanOrEqualTo(3); } - @RepeatedTest(5) + @Test public void testWithOnlyTwoTopicsReceiving() { String topic1 = UUID.randomUUID().toString(); String topic2 = UUID.randomUUID().toString(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java index b5bde53213..d54bd46dd2 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java @@ -19,7 +19,6 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; @@ -189,7 +188,7 @@ void testThrottledStrategy() { } - @RepeatedTest(10) + @Test void testThrottledStrategyWithManyRecords() { MapBasedConfig config = commonConfiguration() .with("lazy-client", true) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java index 1566c7e92e..0b944cda61 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java @@ -22,7 +22,6 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; @@ -225,7 +224,7 @@ void testThrottledStrategy() { } - @RepeatedTest(10) + @Test void testThrottledStrategyWithManyRecords() { MapBasedConfig config = commonConfiguration() .with("lazy-client", true) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java index c862fe8e2c..c2b88e9838 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java @@ -15,7 +15,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.kafka.*; @@ -47,7 +47,7 @@ void closing() { vertx.closeAndAwait(); } - @RepeatedTest(10) + @Test void testRebalance() throws InterruptedException { String group = UUID.randomUUID().toString(); MapBasedConfig config = commonConfiguration()