From d2aa114f538da2f05d887b9c1ad4b77486267776 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Fri, 31 May 2024 09:46:28 -0500 Subject: [PATCH] Track the source of request for Kafka server (#4572) Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../kafka/buffer/BufferTopicConfig.java | 10 +++++++++ .../configuration/TopicConsumerConfig.java | 2 ++ .../consumer/KafkaCustomConsumerFactory.java | 11 +++++++--- .../plugins/kafka/source/KafkaSource.java | 21 ++----------------- .../kafka/source/SourceTopicConfig.java | 10 +++++++++ .../plugins/kafka/source/KafkaSourceTest.java | 15 +++++++++++++ 6 files changed, 47 insertions(+), 22 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java index ee0f6557de..56377c1f22 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java @@ -65,6 +65,11 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig @Size(min = 1, max = 255, message = "size of group id should be between 1 and 255") private String groupId; + @JsonProperty("client_id") + @Valid + @Size(min = 1, max = 255, message = "size of client id should be between 1 and 255") + private String clientId; + @JsonProperty("workers") @Valid @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") @@ -135,6 +140,11 @@ public String getGroupId() { return groupId; } + @Override + public String getClientId() { + return clientId; + } + @Override public Duration getCommitInterval() { return commitInterval; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java index 0ae2126cbe..0f8de7b458 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java @@ -16,6 +16,8 @@ public interface TopicConsumerConfig extends TopicConfig { String getGroupId(); + String getClientId(); + Boolean getAutoCommit(); String getAutoOffsetReset(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index e4f0529ef8..d703538e42 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -134,14 +134,19 @@ private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig, break; } } - setConsumerTopicProperties(properties, topicConfig); + setConsumerTopicProperties(properties, topicConfig, topicConfig.getGroupId()); setSchemaRegistryProperties(sourceConfig, properties, topicConfig); LOG.debug("Starting consumer with the properties : {}", properties); return properties; } - private void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig) { - properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); + + public static void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig, + final String groupId) { + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + if (Objects.nonNull(topicConfig.getClientId())) { + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, topicConfig.getClientId()); + } properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes()); properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue()); properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue()); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 3877350d3f..6a01a91bf0 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -37,6 +37,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; @@ -318,25 +319,7 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic } private void setConsumerTopicProperties(Properties properties, TopicConsumerConfig topicConfig) { - properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID); - properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes()); - properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue()); - properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue()); - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - topicConfig.getAutoCommit()); - properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - ((Long) topicConfig.getCommitInterval().toMillis()).intValue()); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - topicConfig.getAutoOffsetReset()); - properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - topicConfig.getConsumerMaxPollRecords()); - properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, - ((Long) topicConfig.getMaxPollInterval().toMillis()).intValue()); - properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue()); - properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue()); - properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes()); - properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); - properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes()); + KafkaCustomConsumerFactory.setConsumerTopicProperties(properties, topicConfig, consumerGroupID); } private void setPropertiesForSchemaRegistryConnectivity(Properties properties) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java index adcf030f1f..703fcded19 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java @@ -49,6 +49,11 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig @Size(min = 1, max = 255, message = "size of group id should be between 1 and 255") private String groupId; + @JsonProperty("client_id") + @Valid + @Size(min = 1, max = 255, message = "size of client id should be between 1 and 255") + private String clientId; + @JsonProperty("workers") @Valid @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") @@ -121,6 +126,11 @@ public String getGroupId() { return groupId; } + @Override + public String getClientId() { + return clientId; + } + @Override public MessageFormat getSerdeFormat() { return serdeFormat; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index ab7b07c9b0..1503a7424d 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -82,6 +82,7 @@ class KafkaSourceTest { private PluginConfigObservable pluginConfigObservable; private static final String TEST_GROUP_ID = "testGroupId"; + private static final String TEST_CLIENT_ID = "testClientId"; public KafkaSource createObjectUnderTest() { return new KafkaSource( @@ -107,6 +108,8 @@ void setUp() throws Exception { when(topic2.getConsumerMaxPollRecords()).thenReturn(1); when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID); when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID); + when(topic1.getClientId()).thenReturn(TEST_CLIENT_ID); + when(topic2.getClientId()).thenReturn(TEST_CLIENT_ID); when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); @@ -156,6 +159,18 @@ void test_kafkaSource_basicFunctionality() { assertTrue(Objects.nonNull(kafkaSource.getConsumer())); } + @Test + void test_kafkaSource_basicFunctionalityWithClientIdNull() { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic1.getClientId()).thenReturn(null); + when(topic1.getClientId()).thenReturn(null); + kafkaSource = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaSource)); + kafkaSource.start(buffer); + assertTrue(Objects.nonNull(kafkaSource.getConsumer())); + } + @Test void test_kafkaSource_retry_consumer_create() throws InterruptedException { when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));