Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated Kafka metadata #2397

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions smallrye-reactive-messaging-kafka/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@
"code": "java.method.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecord<K, T>::getNack()",
"justification": "Added Message metadata propagation"
},
{
"code": "java.class.removed",
"old": "class io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata<K, T>",
"justification": "Removed deprecated IncomingKafkaRecordMetadata"
},
{
"code": "java.class.removed",
"old": "interface io.smallrye.reactive.messaging.kafka.KafkaMessageMetadata<K>",
"justification": "Removed deprecated KafkaMessageMetadata"
},
{
"code": "java.class.removed",
"old": "class io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata<K>",
"justification": "Removed deprecated OutgoingKafkaRecordMetadata"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
Expand All @@ -20,13 +21,11 @@
public class IncomingKafkaRecord<K, T> implements KafkaRecord<K, T>, MetadataInjectableMessage<T> {

private Metadata metadata;
// TODO add as a normal import once we have removed IncomingKafkaRecordMetadata in this package
private final io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<K, T> kafkaMetadata;
private final IncomingKafkaRecordMetadata<K, T> kafkaMetadata;
private final KafkaCommitHandler commitHandler;
private final KafkaFailureHandler onNack;
private final T payload;

@SuppressWarnings("deprecation")
public IncomingKafkaRecord(ConsumerRecord<K, T> record,
String channel,
int index,
Expand All @@ -35,14 +34,10 @@ public IncomingKafkaRecord(ConsumerRecord<K, T> record,
boolean cloudEventEnabled,
boolean tracingEnabled) {
this.commitHandler = commitHandler;
this.kafkaMetadata = new io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<>(record, channel, index);
// TODO remove this duplication once we have removed IncomingKafkaRecordMetadata from this package
// Duplicate the metadata so old and new copies can both be found
IncomingKafkaRecordMetadata<K, T> deprecatedKafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index);
this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index);

ArrayList<Object> meta = new ArrayList<>();
meta.add(this.kafkaMetadata);
meta.add(deprecatedKafkaMetadata);
T payload = null;
boolean payloadSet = false;
if (cloudEventEnabled) {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
Expand All @@ -15,63 +14,46 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;

public class OutgoingKafkaRecord<K, T> implements KafkaRecord<K, T> {

private final T value;
private final Function<Metadata, CompletionStage<Void>> ack;
private final BiFunction<Throwable, Metadata, CompletionStage<Void>> nack;
private final Metadata metadata;
// TODO Use a normal import once OutgoingKafkaRecordMetadata in this package has been removed
private final io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<K> kafkaMetadata;
private final OutgoingKafkaRecordMetadata<K> kafkaMetadata;

@SuppressWarnings("deprecation")
public OutgoingKafkaRecord(String topic, K key, T value, Instant timestamp, int partition, Headers headers,
Function<Metadata, CompletionStage<Void>> ack,
BiFunction<Throwable, Metadata, CompletionStage<Void>> nack, Metadata existingMetadata) {
kafkaMetadata = io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.<K> builder()
kafkaMetadata = OutgoingKafkaRecordMetadata.<K> builder()
.withTopic(topic)
.withKey(key)
.withTimestamp(timestamp)
.withPartition(partition)
.withHeaders(headers)
.build();
OutgoingKafkaRecordMetadata<K> deprecatedMetadata = new OutgoingKafkaRecordMetadata<>(topic, key, partition, timestamp,
headers);

Metadata metadata;
if (existingMetadata != null) {
metadata = Metadata.from(existingMetadata).with(kafkaMetadata);
this.metadata = Metadata.from(existingMetadata).with(kafkaMetadata);
} else {
metadata = Metadata.of(kafkaMetadata);
this.metadata = Metadata.of(kafkaMetadata);
}
// Add the deprecated metadata while the two exist side by side
this.metadata = Metadata.from(metadata).with(deprecatedMetadata);

this.value = value;
this.ack = ack;
this.nack = nack;
}

@SuppressWarnings({ "unchecked", "deprecation", "rawtypes" })
@SuppressWarnings({ "unchecked" })
public static <K, T> OutgoingKafkaRecord<K, T> from(Message<T> message) {
// TODO Use a normal import once we've removed the legacy version of OutgoingKafkaRecordMetadata in this package
// Also this block should work to obtain the metadata once we've removed the legacy version
// io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<K> md =
// message.getMetadata(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.class)
// .orElse(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.builder().build());

Optional<io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata> md = message
.getMetadata(io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.class);
if (!md.isPresent()) {
md = message.getMetadata(OutgoingKafkaRecordMetadata.class);
}
io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata<K> kafkaMetadata = md.orElse(
io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata.builder().build());
// TODO - delete the above once we remove the legacy metadata

OutgoingKafkaRecordMetadata<K> kafkaMetadata = message.getMetadata(OutgoingKafkaRecordMetadata.class)
.orElse(OutgoingKafkaRecordMetadata.builder().build());
return new OutgoingKafkaRecord<>(kafkaMetadata.getTopic(), kafkaMetadata.getKey(), message.getPayload(),
kafkaMetadata.getTimestamp(), kafkaMetadata.getPartition(),
kafkaMetadata.getHeaders(), message.getAckWithMetadata(), message.getNackWithMetadata(), message.getMetadata());
kafkaMetadata.getTimestamp(), kafkaMetadata.getPartition(), kafkaMetadata.getHeaders(),
message.getAckWithMetadata(), message.getNackWithMetadata(), message.getMetadata());
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.function.Function;
Expand Down Expand Up @@ -176,15 +175,16 @@ private synchronized void reportFailure(Throwable failure) {
private Function<Message<?>, Uni<Void>> writeMessageToKafka() {
return message -> {
try {
Optional<OutgoingKafkaRecordMetadata<?>> om = getOutgoingKafkaRecordMetadata(message);
OutgoingKafkaRecordMetadata<?> outgoingMetadata = om.orElse(null);
OutgoingKafkaRecordMetadata<?> outgoingMetadata = message.getMetadata(OutgoingKafkaRecordMetadata.class)
.orElse(null);
String actualTopic = outgoingMetadata == null || outgoingMetadata.getTopic() == null ? this.topic
: outgoingMetadata.getTopic();

ProducerRecord<?, ?> record;
OutgoingCloudEventMetadata<?> ceMetadata = message.getMetadata(OutgoingCloudEventMetadata.class)
.orElse(null);
IncomingKafkaRecordMetadata<?, ?> incomingMetadata = getIncomingKafkaRecordMetadata(message).orElse(null);
IncomingKafkaRecordMetadata<?, ?> incomingMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class)
.orElse(null);

if (message.getPayload() instanceof ProducerRecord) {
record = (ProducerRecord<?, ?>) message.getPayload();
Expand Down Expand Up @@ -254,29 +254,6 @@ private boolean isRecoverable(Throwable f) {
return !NOT_RECOVERABLE.contains(f.getClass());
}

@SuppressWarnings("deprecation")
private Optional<OutgoingKafkaRecordMetadata<?>> getOutgoingKafkaRecordMetadata(Message<?> message) {
Optional<OutgoingKafkaRecordMetadata<?>> metadata = message.getMetadata(OutgoingKafkaRecordMetadata.class)
.map(x -> (OutgoingKafkaRecordMetadata<?>) x);
if (metadata.isPresent()) {
return metadata;
}
metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata.class)
.map(x -> (OutgoingKafkaRecordMetadata<?>) x);
return metadata;
}

private Optional<IncomingKafkaRecordMetadata<?, ?>> getIncomingKafkaRecordMetadata(Message<?> message) {
Optional<IncomingKafkaRecordMetadata<?, ?>> metadata = message.getMetadata(IncomingKafkaRecordMetadata.class)
.map(x -> (IncomingKafkaRecordMetadata<?, ?>) x);
if (metadata.isPresent()) {
return metadata;
}
metadata = message.getMetadata(io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata.class)
.map(x -> (IncomingKafkaRecordMetadata<?, ?>) x);
return metadata;
}

@SuppressWarnings("rawtypes")
private ProducerRecord<?, ?> getProducerRecord(Message<?> message, OutgoingKafkaRecordMetadata<?> om,
IncomingKafkaRecordMetadata<?, ?> im, String actualTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public void testCreationOfKafkaRecordFromKeyAndValue() {
assertThat(metadata.getTopic()).isNull();
assertThat(metadata.getTimestamp()).isNull();
assertThat(metadata.getHeaders()).isEmpty();
LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message);
}

@Test
Expand Down Expand Up @@ -79,7 +78,6 @@ public void testCreationOfKafkaRecordFromKeyAndValueAndTopic() {
assertThat(metadata.getTopic()).isEqualTo("topic");
assertThat(metadata.getTimestamp()).isNull();
assertThat(metadata.getHeaders()).isEmpty();
LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message);
}

@Test
Expand All @@ -100,7 +98,6 @@ public void testCreationOfKafkaRecordWithEverything() {
assertThat(metadata.getTopic()).isEqualTo("topic");
assertThat(metadata.getTimestamp()).isEqualTo(timestamp);
assertThat(metadata.getHeaders()).isEmpty();
LegacyMetadataTestUtils.tempCompareLegacyAndApiMetadata(metadata, message);
}

@Test
Expand Down
Loading
Loading