Skip to content

Commit

Permalink
Track the source of request for Kafka server (opensearch-project#4572)
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh authored May 31, 2024
1 parent 17790ff commit d2aa114
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
private String groupId;

@JsonProperty("client_id")
@Valid
@Size(min = 1, max = 255, message = "size of client id should be between 1 and 255")
private String clientId;

@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
Expand Down Expand Up @@ -135,6 +140,11 @@ public String getGroupId() {
return groupId;
}

@Override
public String getClientId() {
return clientId;
}

@Override
public Duration getCommitInterval() {
return commitInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface TopicConsumerConfig extends TopicConfig {

String getGroupId();

String getClientId();

Boolean getAutoCommit();

String getAutoOffsetReset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,19 @@ private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig,
break;
}
}
setConsumerTopicProperties(properties, topicConfig);
setConsumerTopicProperties(properties, topicConfig, topicConfig.getGroupId());
setSchemaRegistryProperties(sourceConfig, properties, topicConfig);
LOG.debug("Starting consumer with the properties : {}", properties);
return properties;
}

private void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId());

public static void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig,
final String groupId) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
if (Objects.nonNull(topicConfig.getClientId())) {
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, topicConfig.getClientId());
}
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate;
import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
Expand Down Expand Up @@ -318,25 +319,7 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic
}

private void setConsumerTopicProperties(Properties properties, TopicConsumerConfig topicConfig) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID);
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
topicConfig.getAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
((Long) topicConfig.getCommitInterval().toMillis()).intValue());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
((Long) topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes());
KafkaCustomConsumerFactory.setConsumerTopicProperties(properties, topicConfig, consumerGroupID);
}

private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
private String groupId;

@JsonProperty("client_id")
@Valid
@Size(min = 1, max = 255, message = "size of client id should be between 1 and 255")
private String clientId;

@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
Expand Down Expand Up @@ -121,6 +126,11 @@ public String getGroupId() {
return groupId;
}

@Override
public String getClientId() {
return clientId;
}

@Override
public MessageFormat getSerdeFormat() {
return serdeFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class KafkaSourceTest {
private PluginConfigObservable pluginConfigObservable;

private static final String TEST_GROUP_ID = "testGroupId";
private static final String TEST_CLIENT_ID = "testClientId";

public KafkaSource createObjectUnderTest() {
return new KafkaSource(
Expand All @@ -107,6 +108,8 @@ void setUp() throws Exception {
when(topic2.getConsumerMaxPollRecords()).thenReturn(1);
when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID);
when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID);
when(topic1.getClientId()).thenReturn(TEST_CLIENT_ID);
when(topic2.getClientId()).thenReturn(TEST_CLIENT_ID);
when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5));
Expand Down Expand Up @@ -156,6 +159,18 @@ void test_kafkaSource_basicFunctionality() {
assertTrue(Objects.nonNull(kafkaSource.getConsumer()));
}

@Test
void test_kafkaSource_basicFunctionalityWithClientIdNull() {
when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(topic1.getClientId()).thenReturn(null);
when(topic1.getClientId()).thenReturn(null);
kafkaSource = createObjectUnderTest();
assertTrue(Objects.nonNull(kafkaSource));
kafkaSource.start(buffer);
assertTrue(Objects.nonNull(kafkaSource.getConsumer()));
}

@Test
void test_kafkaSource_retry_consumer_create() throws InterruptedException {
when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
Expand Down

0 comments on commit d2aa114

Please sign in to comment.