Skip to content

Commit

Permalink
Merge pull request #2397 from ozangunalp/remove_deprecated_kafka_meta…
Browse files Browse the repository at this point in the history
…data

Remove deprecated Kafka metadata
  • Loading branch information
cescoffier authored Dec 6, 2023
2 parents 8b5143c + e2a37f2 commit 41110f2
Show file tree
Hide file tree
Showing 23 changed files with 52 additions and 1,584 deletions.
15 changes: 15 additions & 0 deletions smallrye-reactive-messaging-kafka/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@
"code": "java.method.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecord<K, T>::getNack()",
"justification": "Added Message metadata propagation"
},
{
"code": "java.class.removed",
"old": "class io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata<K, T>",
"justification": "Removed deprecated IncomingKafkaRecordMetadata"
},
{
"code": "java.class.removed",
"old": "interface io.smallrye.reactive.messaging.kafka.KafkaMessageMetadata<K>",
"justification": "Removed deprecated KafkaMessageMetadata"
},
{
"code": "java.class.removed",
"old": "class io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata<K>",
"justification": "Removed deprecated OutgoingKafkaRecordMetadata"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
Expand All @@ -20,13 +21,11 @@
public class IncomingKafkaRecord<K, T> implements KafkaRecord<K, T>, MetadataInjectableMessage<T> {

private Metadata metadata;
// TODO add as a normal import once we have removed IncomingKafkaRecordMetadata in this package
private final io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<K, T> kafkaMetadata;
private final IncomingKafkaRecordMetadata<K, T> kafkaMetadata;
private final KafkaCommitHandler commitHandler;
private final KafkaFailureHandler onNack;
private final T payload;

@SuppressWarnings("deprecation")
public IncomingKafkaRecord(ConsumerRecord<K, T> record,
String channel,
int index,
Expand All @@ -35,14 +34,10 @@ public IncomingKafkaRecord(ConsumerRecord<K, T> record,
boolean cloudEventEnabled,
boolean tracingEnabled) {
this.commitHandler = commitHandler;
this.kafkaMetadata = new io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<>(record, channel, index);
// TODO remove this duplication once we have removed IncomingKafkaRecordMetadata from this package
// Duplicate the metadata so old and new copies can both be found
IncomingKafkaRecordMetadata<K, T> deprecatedKafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index);
this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index);

ArrayList<Object> meta = new ArrayList<>();
meta.add(this.kafkaMetadata);
meta.add(deprecatedKafkaMetadata);
T payload = null;
boolean payloadSet = false;
if (cloudEventEnabled) {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
Expand All @@ -15,63 +14,46 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;

public class OutgoingKafkaRecord<K, T> implements KafkaRecord<K, T> {

private final T value;
private final Function<Metadata, CompletionStage<Void>> ack;
private final BiFunction<Throwable, Metadata, CompletionStage<Void>> nack;
private final Metadata metadata;
// TODO Use a normal import once OutgoingKafkaRecordMetadata in this package has been removed
private final io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<K> kafkaMetadata;
private final OutgoingKafkaRecordMetadata<K> kafkaMetadata;

@SuppressWarnings("deprecation")
public OutgoingKafkaRecord(String topic, K key, T value, Instant timestamp, int partition, Headers headers,
Function<Metadata, CompletionStage<Void>> ack,
BiFunction<Throwable, Metadata, CompletionStage<Void>> nack, Metadata existingMetadata) {
kafkaMetadata = io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.<K> builder()
kafkaMetadata = OutgoingKafkaRecordMetadata.<K> builder()
.withTopic(topic)
.withKey(key)
.withTimestamp(timestamp)
.withPartition(partition)
.withHeaders(headers)
.build();
OutgoingKafkaRecordMetadata<K> deprecatedMetadata = new OutgoingKafkaRecordMetadata<>(topic, key, partition, timestamp,
headers);

Metadata metadata;
if (existingMetadata != null) {
metadata = Metadata.from(existingMetadata).with(kafkaMetadata);
this.metadata = Metadata.from(existingMetadata).with(kafkaMetadata);
} else {
metadata = Metadata.of(kafkaMetadata);
this.metadata = Metadata.of(kafkaMetadata);
}
// Add the deprecated metadata while the two exist side by side
this.metadata = Metadata.from(metadata).with(deprecatedMetadata);

this.value = value;
this.ack = ack;
this.nack = nack;
}

@SuppressWarnings({ "unchecked", "deprecation", "rawtypes" })
@SuppressWarnings({ "unchecked" })
public static <K, T> OutgoingKafkaRecord<K, T> from(Message<T> message) {
// TODO Use a normal import once we've removed the legacy version of OutgoingKafkaRecordMetadata in this package
// Also this block should work to obtain the metadata once we've removed the legacy version
// io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<K> md =
// message.getMetadata(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.class)
// .orElse(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.builder().build());

Optional<io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata> md = message
.getMetadata(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.class);
if (!md.isPresent()) {
md = message.getMetadata(OutgoingKafkaRecordMetadata.class);
}
io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<K> kafkaMetadata = md.orElse(
io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.builder().build());
// TODO - delete the above once we remove the legacy metadata

OutgoingKafkaRecordMetadata<K> kafkaMetadata = message.getMetadata(OutgoingKafkaRecordMetadata.class)
.orElse(OutgoingKafkaRecordMetadata.builder().build());
return new OutgoingKafkaRecord<>(kafkaMetadata.getTopic(), kafkaMetadata.getKey(), message.getPayload(),
kafkaMetadata.getTimestamp(), kafkaMetadata.getPartition(),
kafkaMetadata.getHeaders(), message.getAckWithMetadata(), message.getNackWithMetadata(), message.getMetadata());
kafkaMetadata.getTimestamp(), kafkaMetadata.getPartition(), kafkaMetadata.getHeaders(),
message.getAckWithMetadata(), message.getNackWithMetadata(), message.getMetadata());
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.function.Function;
Expand Down Expand Up @@ -176,15 +175,16 @@ private synchronized void reportFailure(Throwable failure) {
private Function<Message<?>, Uni<Void>> writeMessageToKafka() {
return message -> {
try {
Optional<OutgoingKafkaRecordMetadata<?>> om = getOutgoingKafkaRecordMetadata(message);
OutgoingKafkaRecordMetadata<?> outgoingMetadata = om.orElse(null);
OutgoingKafkaRecordMetadata<?> outgoingMetadata = message.getMetadata(OutgoingKafkaRecordMetadata.class)
.orElse(null);
String actualTopic = outgoingMetadata == null || outgoingMetadata.getTopic() == null ? this.topic
: outgoingMetadata.getTopic();

ProducerRecord<?, ?> record;
OutgoingCloudEventMetadata<?> ceMetadata = message.getMetadata(OutgoingCloudEventMetadata.class)
.orElse(null);
IncomingKafkaRecordMetadata<?, ?> incomingMetadata = getIncomingKafkaRecordMetadata(message).orElse(null);
IncomingKafkaRecordMetadata<?, ?> incomingMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class)
.orElse(null);

if (message.getPayload() instanceof ProducerRecord) {
record = (ProducerRecord<?, ?>) message.getPayload();
Expand Down Expand Up @@ -254,29 +254,6 @@ private boolean isRecoverable(Throwable f) {
return !NOT_RECOVERABLE.contains(f.getClass());
}

@SuppressWarnings("deprecation")
private Optional<OutgoingKafkaRecordMetadata<?>> getOutgoingKafkaRecordMetadata(Message<?> message) {
Optional<OutgoingKafkaRecordMetadata<?>> metadata = message.getMetadata(OutgoingKafkaRecordMetadata.class)
.map(x -> (OutgoingKafkaRecordMetadata<?>) x);
if (metadata.isPresent()) {
return metadata;
}
metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata.class)
.map(x -> (OutgoingKafkaRecordMetadata<?>) x);
return metadata;
}

private Optional<IncomingKafkaRecordMetadata<?, ?>> getIncomingKafkaRecordMetadata(Message<?> message) {
Optional<IncomingKafkaRecordMetadata<?, ?>> metadata = message.getMetadata(IncomingKafkaRecordMetadata.class)
.map(x -> (IncomingKafkaRecordMetadata<?, ?>) x);
if (metadata.isPresent()) {
return metadata;
}
metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata.class)
.map(x -> (IncomingKafkaRecordMetadata<?, ?>) x);
return metadata;
}

@SuppressWarnings("rawtypes")
private ProducerRecord<?, ?> getProducerRecord(Message<?> message, OutgoingKafkaRecordMetadata<?> om,
IncomingKafkaRecordMetadata<?, ?> im, String actualTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public void testCreationOfKafkaRecordFromKeyAndValue() {
assertThat(metadata.getTopic()).isNull();
assertThat(metadata.getTimestamp()).isNull();
assertThat(metadata.getHeaders()).isEmpty();
LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message);
}

@Test
Expand Down Expand Up @@ -79,7 +78,6 @@ public void testCreationOfKafkaRecordFromKeyAndValueAndTopic() {
assertThat(metadata.getTopic()).isEqualTo("topic");
assertThat(metadata.getTimestamp()).isNull();
assertThat(metadata.getHeaders()).isEmpty();
LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message);
}

@Test
Expand All @@ -100,7 +98,6 @@ public void testCreationOfKafkaRecordWithEverything() {
assertThat(metadata.getTopic()).isEqualTo("topic");
assertThat(metadata.getTimestamp()).isEqualTo(timestamp);
assertThat(metadata.getHeaders()).isEmpty();
LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message);
}

@Test
Expand Down
Loading

0 comments on commit 41110f2

Please sign in to comment.