Skip to content

Commit

Permalink
bump kafka clients to version 3.6.2 (#1853)
Browse files Browse the repository at this point in the history
* bump kafka clients to 3.x

* remove test that depends on partitioner internals

* fix test that depends on producer internals
  • Loading branch information
moscicky authored May 6, 2024
1 parent 67255cc commit 0a6cfd7
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ allprojects {
targetCompatibility = JavaVersion.VERSION_17

project.ext.versions = [
kafka : '2.8.2',
kafka : '3.6.2',
guava : '23.0',
jackson : '2.15.2',
jersey : '3.1.2',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class FailFastLocalKafkaProducerProperties implements KafkaProducerParame

private boolean reportNodeMetricsEnabled = false;

private boolean idempotenceEnabled = false;

@Override
public Duration getMaxBlock() {
return maxBlock;
Expand Down Expand Up @@ -158,4 +160,12 @@ public Duration getDeliveryTimeout() {
public void setDeliveryTimeout(Duration deliveryTimeout) {
this.deliveryTimeout = deliveryTimeout;
}

public boolean isIdempotenceEnabled() {
return idempotenceEnabled;
}

public void setIdempotenceEnabled(boolean idempotenceEnabled) {
this.idempotenceEnabled = idempotenceEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class FailFastRemoteKafkaProducerProperties implements KafkaProducerParam

private boolean reportNodeMetricsEnabled = false;

private boolean idempotenceEnabled = false;

@Override
public Duration getMaxBlock() {
return maxBlock;
Expand Down Expand Up @@ -158,4 +160,13 @@ public Duration getDeliveryTimeout() {
public void setDeliveryTimeout(Duration deliveryTimeout) {
this.deliveryTimeout = deliveryTimeout;
}

@Override
public boolean isIdempotenceEnabled() {
return idempotenceEnabled;
}

public void setIdempotenceEnabled(boolean idempotenceEnabled) {
this.idempotenceEnabled = idempotenceEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class KafkaProducerProperties implements KafkaProducerParameters {

private boolean reportNodeMetricsEnabled = false;

private boolean idempotenceEnabled = false;

@Override
public Duration getMaxBlock() {
return maxBlock;
Expand Down Expand Up @@ -161,4 +163,12 @@ public Duration getDeliveryTimeout() {
public void setDeliveryTimeout(Duration deliveryTimeout) {
this.deliveryTimeout = deliveryTimeout;
}

public boolean isIdempotenceEnabled() {
return idempotenceEnabled;
}

public void setIdempotenceEnabled(boolean idempotenceEnabled) {
this.idempotenceEnabled = idempotenceEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ private double findProducerMetric(Producer<K, V> producer,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
Optional<? extends Map.Entry<MetricName, ? extends Metric>> first =
producer.metrics().entrySet().stream().filter(predicate).findFirst();
double value = first.map(metricNameEntry -> metricNameEntry.getValue().value()).orElse(0.0);
return value < 0 ? 0.0 : value;
Object value = first.map(metricNameEntry -> metricNameEntry.getValue().metricValue()).orElse(0.0d);
if (value instanceof Number) {
return ((Number) value).doubleValue();
} else {
return 0.0;
}
}

private ToDoubleFunction<Producer<K, V>> producerMetric(MetricName producerMetricName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
Expand Down Expand Up @@ -114,6 +115,7 @@ private KafkaMessageSender<byte[], byte[]> sender(KafkaParameters kafkaParameter
props.put(LINGER_MS_CONFIG, (int) kafkaProducerParameters.getLinger().toMillis());
props.put(METRICS_SAMPLE_WINDOW_MS_CONFIG, (int) kafkaProducerParameters.getMetricsSampleWindow().toMillis());
props.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, kafkaProducerParameters.getMaxInflightRequestsPerConnection());
props.put(ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerParameters.isIdempotenceEnabled());
props.put(ACKS_CONFIG, acks);

if (kafkaParameters.isAuthenticationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface KafkaProducerParameters {
int getMaxInflightRequestsPerConnection();

boolean isReportNodeMetricsEnabled();

boolean isIdempotenceEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import org.apache.kafka.common.TopicPartition
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.spock.Testcontainers
import pl.allegro.tech.hermes.api.ContentType
import pl.allegro.tech.hermes.api.DeliveryType
import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.SubscriptionMode
import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.api.*
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId
import pl.allegro.tech.hermes.common.kafka.JsonToAvroMigrationKafkaNamesMapper
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper
Expand All @@ -33,6 +29,7 @@ import pl.allegro.tech.hermes.frontend.server.CachedTopicsTestHelper
import pl.allegro.tech.hermes.metrics.PathsCompiler
import pl.allegro.tech.hermes.test.helper.avro.AvroUser
import pl.allegro.tech.hermes.test.helper.builder.TopicBuilder
import pl.allegro.tech.hermes.test.helper.containers.ImageTags
import spock.lang.Shared
import spock.lang.Specification

Expand All @@ -42,10 +39,7 @@ import java.util.stream.Collectors
import static java.util.Collections.emptyList
import static java.util.Collections.emptyMap
import static java.util.concurrent.TimeUnit.MILLISECONDS
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
import static org.apache.kafka.clients.consumer.ConsumerConfig.*
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

Expand All @@ -58,7 +52,7 @@ class LocalDatacenterMessageProducerIntegrationTest extends Specification {
BrokerLatencyReporter brokerLatencyReporter = new BrokerLatencyReporter(false, null, null, null)

@Shared
KafkaContainer kafkaContainer = new KafkaContainer()
KafkaContainer kafkaContainer = new KafkaContainer(ImageTags.confluentImagesTag())

@Shared
KafkaProducer<byte[], byte[]> leaderConfirms
Expand Down Expand Up @@ -161,34 +155,6 @@ class LocalDatacenterMessageProducerIntegrationTest extends Specification {
partitionsWithMessagesData.get(0).offset() == 10
}

def "should publish messages with random distribiution when pratition-key is not present"() {
Topic topic = createAvroTopic("pl.allegro.test.randomFoo")
Subscription subscription = createTestSubscription(topic, "test-subscription")
String kafkaTopicName = topic.getName().toString()
ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription.qualifiedName)
createTopicInKafka(kafkaTopicName, NUMBER_OF_PARTITION)
CachedTopic cachedTopic = CachedTopicsTestHelper.cachedTopic(topic)
KafkaConsumer consumer = createConsumer(consumerGroupId, kafkaTopicName)

when:
1.upto(10) {
brokerMessageProducer.send(generateAvroMessage(null), cachedTopic, null)
waitForRecordPublishing(consumer)
}

then:
consumer.close()

List<OffsetAndMetadata> partitionsWithMessagesData = adminClient
.listConsumerGroupOffsets(consumerGroupId.asString())
.partitionsToOffsetAndMetadata()
.get().values().stream()
.filter { metadata -> metadata.offset() != 0 }
.collect(Collectors.toList())

partitionsWithMessagesData.size() == NUMBER_OF_PARTITION
}

private static AvroMessage generateAvroMessage(String partitionKey) {
def avroUser = new AvroUser()
return new AvroMessage(UUID.randomUUID().toString(), avroUser.asBytes(), 0L, avroUser.compiledSchema,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.allegro.tech.hermes.test.helper.containers;

public class ImageTags {
static String confluentImagesTag() {
public static String confluentImagesTag() {
return System.getProperty("confluentImagesTag", "6.1.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.jayway.awaitility.Awaitility.waitAtMost;
import static java.util.stream.IntStream.range;
Expand Down Expand Up @@ -84,7 +85,7 @@ public void shouldMoveOffsetInDryRunMode() throws InterruptedException {
TestSubscriber subscriber = subscribers.createSubscriber();
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
final Subscription subscription = hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint()).build());
// we have 2 partitions, thus 4 messages to get 2 per partition
// 4 messages
publishAndConsumeMessages(messages, topic, subscriber);
Thread.sleep(2000);
final OffsetRetransmissionDate retransmissionDate = new OffsetRetransmissionDate(OffsetDateTime.now());
Expand All @@ -100,8 +101,9 @@ public void shouldMoveOffsetInDryRunMode() throws InterruptedException {
MultiDCOffsetChangeSummary summary = response.expectBody(MultiDCOffsetChangeSummary.class).returnResult().getResponseBody();

assert summary != null;
assertThat(summary.getPartitionOffsetListPerBrokerName().get(PRIMARY_KAFKA_CLUSTER_NAME).get(0).getOffset())
.isEqualTo(2);
Long offsetSum = summary.getPartitionOffsetListPerBrokerName().get(PRIMARY_KAFKA_CLUSTER_NAME).stream()
.collect(Collectors.summarizingLong(PartitionOffset::getOffset)).getSum();
assertThat(offsetSum).isEqualTo(4);
subscriber.noMessagesReceived();
}

Expand Down

0 comments on commit 0a6cfd7

Please sign in to comment.