Skip to content

Commit

Permalink
Config changes and support for adding different modes to put kafka ke…
Browse files Browse the repository at this point in the history
…y in the event (opensearch-project#3076)

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka authored Jul 31, 2023
1 parent b519a82 commit f809163
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 53 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class KafkaSourceMultipleAuthTypeIT {
@Mock
private PlainTextAuthConfig plainTextAuthConfig;

@Mock
private KafkaSourceConfig.EncryptionConfig encryptionConfig;

private TopicConfig jsonTopic;
private TopicConfig avroTopic;

Expand All @@ -104,6 +107,7 @@ public void setup() {
pluginMetrics = mock(PluginMetrics.class);
counter = mock(Counter.class);
buffer = mock(Buffer.class);
encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class);
receivedRecords = new ArrayList<>();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
pipelineDescription = mock(PipelineDescription.class);
Expand Down Expand Up @@ -139,12 +143,13 @@ public void setup() {
kafkaUsername = System.getProperty("tests.kafka.username");
kafkaPassword = System.getProperty("tests.kafka.password");
when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers);
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
}

@Test
public void TestPlainTextWithNoAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() throws Exception {
final int numRecords = 1;
when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.NONE);
when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE);
when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic));
when(sourceConfig.getAuthConfig()).thenReturn(null);
Expand Down Expand Up @@ -193,16 +198,14 @@ public void TestPlainTextWithAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() thr
final int numRecords = 1;
authConfig = mock(AuthConfig.class);
saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class);
plainTextAuthConfig = mock(PlainTextAuthConfig.class);
when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.NONE);
when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE);
when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic));
plainTextAuthConfig = mock(PlainTextAuthConfig.class);
when(plainTextAuthConfig.getUsername()).thenReturn(kafkaUsername);
when(plainTextAuthConfig.getPassword()).thenReturn(kafkaPassword);
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);
when(authConfig.getInsecure()).thenReturn(true);
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
when(sourceConfig.getBootStrapServers()).thenReturn(saslplainBootstrapServers);
kafkaSource = createObjectUnderTest();
Expand Down Expand Up @@ -252,8 +255,8 @@ public void TestPlainTextWithNoAuthKafkaEncryptionWithNoAuthSchemaRegistry() thr
saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class);
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getSaslAuthConfig()).thenReturn(null);
when(authConfig.getInsecure()).thenReturn(true);
when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.SSL);
when(encryptionConfig.getInsecure()).thenReturn(true);
when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL);
when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getBootStrapServers()).thenReturn(sslBootstrapServers);
when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic));
Expand Down Expand Up @@ -306,10 +309,10 @@ public void TestPlainTextWithAuthKafkaEncryptionWithNoAuthSchemaRegistry() throw
when(plainTextAuthConfig.getUsername()).thenReturn(kafkaUsername);
when(plainTextAuthConfig.getPassword()).thenReturn(kafkaPassword);
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getInsecure()).thenReturn(true);
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.SSL);
when(encryptionConfig.getInsecure()).thenReturn(true);
when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL);
when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getBootStrapServers()).thenReturn(saslsslBootstrapServers);
when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public class MskGlueRegistryMultiTypeIT {
@Mock
private AwsConfig.AwsMskConfig awsMskConfig;

@Mock
private KafkaSourceConfig.EncryptionConfig encryptionConfig;

private KafkaSource kafkaSource;
private TopicConfig jsonTopic;
private TopicConfig avroTopic;
Expand Down Expand Up @@ -144,7 +147,7 @@ public void setup() {
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT);
when(sourceConfig.getSchemaConfig()).thenReturn(schemaConfig);
when(schemaConfig.getType()).thenReturn(SchemaRegistryType.GLUE);
when(schemaConfig.getType()).thenReturn(SchemaRegistryType.AWS_GLUE);
when(pluginMetrics.counter(anyString())).thenReturn(counter);
when(pipelineDescription.getPipelineName()).thenReturn("testPipeline");
try {
Expand Down Expand Up @@ -183,13 +186,15 @@ public void setup() {
testMskArn = System.getProperty("tests.msk.arn");
testMskRegion = System.getProperty("tests.msk.region");
when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers);
encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class);
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
}

@Test
public void TestJsonRecordConsumer() throws Exception {
final int numRecords = 1;
when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.SSL);
when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL);
when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic));
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);
Expand Down Expand Up @@ -256,7 +261,7 @@ public void TestJsonRecordConsumer() throws Exception {
@Test
public void TestAvroRecordConsumer() throws Exception {
final int numRecords = 1;
when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.SSL);
when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL);
when(avroTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getTopics()).thenReturn(List.of(avroTopic));
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static class SaslAuthConfig {
@JsonProperty("oauth")
private OAuthConfig oAuthConfig;

@JsonProperty("aws_iam")
@JsonProperty("aws_msk_iam")
private AwsIamAuthConfig awsIamAuthConfig;

public AwsIamAuthConfig getAwsIamAuthConfig() {
Expand Down Expand Up @@ -61,9 +61,6 @@ public SslAuthConfig() {
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

@JsonProperty("insecure")
private Boolean insecure = false;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}
Expand All @@ -72,10 +69,6 @@ public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}

public Boolean getInsecure() {
return insecure;
}

/*
* Currently SSL config is not supported. Commenting this for future use
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum KafkaKeyMode {
DISCARD("discard"),
INCLUDE_AS_FIELD("include_as_field"),
INCLUDE_AS_METADATA("include_as_metadata");

private static final Map<String, KafkaKeyMode> OPTIONS_MAP = Arrays.stream(KafkaKeyMode.values())
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

private final String type;

KafkaKeyMode(final String type) {
this.type = type;
}

@JsonCreator
static KafkaKeyMode fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@
*/

public class KafkaSourceConfig {
public class EncryptionConfig {
@JsonProperty("type")
private EncryptionType type = EncryptionType.SSL;

@JsonProperty("insecure")
private boolean insecure = false;

public EncryptionType getType() {
return type;
}

public boolean getInsecure() {
return insecure;
}
}

public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30);

@JsonProperty("bootstrap_servers")
Expand All @@ -39,7 +55,7 @@ public class KafkaSourceConfig {
private AuthConfig authConfig;

@JsonProperty("encryption")
private EncryptionType encryptionType = EncryptionType.SSL;
private EncryptionConfig encryptionConfig;

@JsonProperty("aws")
@Valid
Expand Down Expand Up @@ -97,8 +113,11 @@ public AuthConfig getAuthConfig() {
return authConfig;
}

public EncryptionType getEncryptionType() {
return encryptionType;
public EncryptionConfig getEncryptionConfig() {
if (Objects.isNull(encryptionConfig)) {
return new EncryptionConfig();
}
return encryptionConfig;
}

public AwsConfig getAwsConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.stream.Collectors;

public enum SchemaRegistryType {
GLUE("glue"),
AWS_GLUE("aws_glue"),
CONFLUENT("confluent");

private static final Map<String, SchemaRegistryType> OPTIONS_MAP = Arrays.stream(SchemaRegistryType.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.kafka.configuration;

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
Expand Down Expand Up @@ -111,6 +111,9 @@ public class TopicConfig {
@Valid
private Integer fetchMinBytes = FETCH_MIN_BYTES;

@JsonProperty("key_mode")
private KafkaKeyMode kafkaKeyMode = KafkaKeyMode.INCLUDE_AS_FIELD;

@JsonProperty("retry_backoff")
private Duration retryBackoff = RETRY_BACKOFF;

Expand Down Expand Up @@ -268,4 +271,9 @@ public String getName() {
public void setName(String name) {
this.name = name;
}

public KafkaKeyMode getKafkaKeyMode() {
return kafkaKeyMode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -200,14 +202,13 @@ public void run() {
}
}

private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord) {
private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, int partition) {
Map<String, Object> data = new HashMap<>();
Event event;
Object value = consumerRecord.value();
String key = (String)consumerRecord.key();
if (Objects.isNull(key)) {
key = DEFAULT_KEY;
}
KafkaKeyMode kafkaKeyMode = topicConfig.getKafkaKeyMode();
boolean plainTextMode = false;
try {
if (value instanceof JsonDataWithSchema) {
JsonDataWithSchema j = (JsonDataWithSchema)consumerRecord.value();
Expand All @@ -217,13 +218,37 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord) {
value = objectMapper.readValue(jsonParser, Map.class);
} else if (schema == MessageFormat.PLAINTEXT) {
value = (String)consumerRecord.value();
plainTextMode = true;
} else if (schema == MessageFormat.JSON) {
value = objectMapper.convertValue(value, Map.class);
}
} catch (Exception e){
LOG.error("Failed to parse JSON or AVRO record", e);
}
data.put(key, value);

if (!plainTextMode) {
if (!(value instanceof Map)) {
data.put(key, value);
} else {
Map<String, Object> valueMap = (Map<String, Object>)value;
if (kafkaKeyMode == KafkaKeyMode.INCLUDE_AS_FIELD) {
valueMap.put("kafka_key", key);
}
data = valueMap;
}
} else {
if (Objects.isNull(key)) {
key = DEFAULT_KEY;
}
data.put(key, value);
}
event = JacksonLog.builder().withData(data).build();
EventMetadata eventMetadata = event.getMetadata();
if (kafkaKeyMode == KafkaKeyMode.INCLUDE_AS_METADATA) {
eventMetadata.setAttribute("kafka_key", key);
}
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", partition);

return new Record<Event>(event);
}

Expand All @@ -232,7 +257,7 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
List<Record<Event>> kafkaRecords = new ArrayList<>();
List<ConsumerRecord<String, T>> partitionRecords = records.records(topicPartition);
for (ConsumerRecord<String, T> consumerRecord : partitionRecords) {
Record<Event> record = getRecord(consumerRecord);
Record<Event> record = getRecord(consumerRecord, topicPartition.partition());
if (record != null) {
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private long calculateLongestThreadWaitingTime() {

private Properties getConsumerProperties(final TopicConfig topicConfig) {
Properties properties = new Properties();
KafkaSourceSecurityConfigurer.setAuthProperties(properties, sourceConfig);
KafkaSourceSecurityConfigurer.setAuthProperties(properties, sourceConfig, LOG);
/* if (isKafkaClusterExists(sourceConfig.getBootStrapServers())) {
throw new RuntimeException("Can't be able to connect to the given Kafka brokers... ");
}*/
Expand Down Expand Up @@ -300,7 +300,7 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi
return;
}

if (schemaConfig.getType() == SchemaRegistryType.GLUE) {
if (schemaConfig.getType() == SchemaRegistryType.AWS_GLUE) {
setPropertiesForGlueSchemaRegistry(properties);
return;
}
Expand Down
Loading

0 comments on commit f809163

Please sign in to comment.