diff --git a/CHANGELOG.md b/CHANGELOG.md index 3562c64184..deaa2235c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.2.0] - 2017-08-29 + +### Added +- Enable lz4 compression type for Kafka producer + ## [2.1.2] - 2017-08-24 ### Fixed diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java index 590018798e..b0ae42e60f 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java @@ -1,6 +1,9 @@ package org.zalando.nakadi.repository.kafka; import org.apache.curator.framework.CuratorFramework; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; @@ -77,8 +80,6 @@ private static String buildBootstrapServers(final List brokers) { private Properties buildKafkaProperties(final List brokers) { final Properties props = new Properties(); props.put("bootstrap.servers", buildBootstrapServers(brokers)); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } @@ -87,27 +88,33 @@ private void updateBrokers() { if (kafkaProperties != null) { final List brokers = fetchBrokers(); if (!brokers.isEmpty()) { - kafkaProperties.setProperty("bootstrap.servers", buildBootstrapServers(brokers)); + kafkaProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + buildBootstrapServers(brokers)); } } } public Properties getKafkaConsumerProperties() { final Properties properties = (Properties) kafkaProperties.clone(); - properties.put("enable.auto.commit", kafkaSettings.getEnableAutoCommit()); - properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaSettings.getEnableAutoCommit()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); return properties; } public Properties getKafkaProducerProperties() { - final Properties producerProps = getKafkaConsumerProperties(); - producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerProps.put("acks", "all"); - producerProps.put("request.timeout.ms", kafkaSettings.getRequestTimeoutMs()); - producerProps.put("batch.size", kafkaSettings.getBatchSize()); - producerProps.put("linger.ms", kafkaSettings.getLingerMs()); + final Properties producerProps = (Properties) kafkaProperties.clone(); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaSettings.getRequestTimeoutMs()); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSettings.getBatchSize()); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaSettings.getLingerMs()); + producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); return producerProps; } }