From faaf4539af738768ce8e0b379d0bf77fabd1a1e2 Mon Sep 17 00:00:00 2001 From: jabeltran Date: Wed, 19 Jul 2023 15:56:43 +0200 Subject: [PATCH 1/2] Add Kafka compression type for Kafka Metastore Listener --- CHANGELOG.md | 4 ++++ .../events/metastore/kafka/messaging/KafkaMessageSender.java | 2 ++ .../metastore/kafka/messaging/KafkaProducerProperty.java | 3 ++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 429dc007..09e67108 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## 7.3.7 - 2023-07-19 +### Added +- `compression.type` [Kafka property](https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html#compression-type) in `kafka-metastore-listener`. + ## 7.3.6 - 2022-11-14 ### Fixed - `apiary-gluesync-listener` when getting null `SortOrder` in Hive & Iceberg tables. diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageSender.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageSender.java index 9d0ce37d..1712e8c5 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageSender.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageSender.java @@ -24,6 +24,7 @@ import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.BOOTSTRAP_SERVERS; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.BUFFER_MEMORY; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.CLIENT_ID; +import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.COMPRESSION_TYPE; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.LINGER_MS; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.RETRIES; @@ -77,6 +78,7 @@ static Properties kafkaProperties(Configuration conf) { props.put(BATCH_SIZE.unprefixedKey(), intProperty(conf, BATCH_SIZE)); props.put(LINGER_MS.unprefixedKey(), longProperty(conf, LINGER_MS)); props.put(BUFFER_MEMORY.unprefixedKey(), longProperty(conf, BUFFER_MEMORY)); + props.put(COMPRESSION_TYPE.unprefixedKey(), stringProperty(conf, COMPRESSION_TYPE)); props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return props; diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerProperty.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerProperty.java index c46a3a3b..b3e89b77 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerProperty.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerProperty.java @@ -28,7 +28,8 @@ public enum KafkaProducerProperty implements Property { BATCH_SIZE("batch.size", 16384), LINGER_MS("linger.ms", 1L), BUFFER_MEMORY("buffer.memory", 33554432L), - SERDE_CLASS("serde.class", JsonMetaStoreEventSerDe.class.getName()); + SERDE_CLASS("serde.class", JsonMetaStoreEventSerDe.class.getName()), + COMPRESSION_TYPE("compression.type", "producer"); private static final String HADOOP_CONF_PREFIX = "com.expediagroup.apiary.extensions.events.metastore.kafka.messaging."; From db553e153acbbf13a84e2f06c551668c0b5aa603 Mon Sep 17 00:00:00 2001 From: jabeltran Date: Wed, 19 Jul 2023 16:05:20 +0200 Subject: [PATCH 2/2] fix tests --- .../kafka/messaging/KafkaProducerPropertyTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerPropertyTest.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerPropertyTest.java index 9b5b3502..e6efa75f 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerPropertyTest.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-listener/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaProducerPropertyTest.java @@ -22,6 +22,7 @@ import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.BOOTSTRAP_SERVERS; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.BUFFER_MEMORY; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.CLIENT_ID; +import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.COMPRESSION_TYPE; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.LINGER_MS; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.RETRIES; @@ -40,7 +41,7 @@ private static String prefixedKey(String key) { @Test public void numberOfProperties() { - assertThat(KafkaProducerProperty.values().length).isEqualTo(10); + assertThat(KafkaProducerProperty.values().length).isEqualTo(11); } @Test @@ -115,4 +116,11 @@ public void serdeClass() { assertThat(SERDE_CLASS.defaultValue()).isEqualTo(JsonMetaStoreEventSerDe.class.getName()); } + @Test + public void compressionType() { + assertThat(COMPRESSION_TYPE.unprefixedKey()).isEqualTo("compression.type"); + assertThat(COMPRESSION_TYPE.key()).isEqualTo(prefixedKey("compression.type")); + assertThat(COMPRESSION_TYPE.defaultValue()).isEqualTo("producer"); + } + }