Skip to content

Commit

Permalink
Glue registry fixes. Fixed a bug in getMSKBootstrapServers (opensearc…
Browse files Browse the repository at this point in the history
…h-project#3142)

* Glue registry fixes. Fixed a bug in getMSKBootstrapServers

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Changed default auto commit reset to earliest

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 11, 2023
1 parent 675c2fa commit a5c4fe2
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class TopicConfig {
static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
static final String DEFAULT_AUTO_OFFSET_RESET = "latest";
static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public <T> void consumeRecords() throws Exception {
Thread.sleep(10000);
} catch (RecordDeserializationException e) {
LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record",
e.topicPartition().topic(), e.topicPartition().partition(), e.offset());
e.topicPartition().topic(), e.topicPartition().partition(), e.offset(), e);
topicMetrics.getNumberOfDeserializationErrors().increment();
consumer.seek(e.topicPartition(), e.offset()+1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,6 +98,8 @@ public class KafkaSource implements Source<Record<Event>> {
private static final String SCHEMA_TYPE = "schemaType";
private final AcknowledgementSetManager acknowledgementSetManager;
private static CachedSchemaRegistryClient schemaRegistryClient;
private GlueSchemaRegistryKafkaDeserializer glueDeserializer;
private StringDeserializer stringDeserializer;

@DataPrepperPluginConstructor
public KafkaSource(final KafkaSourceConfig sourceConfig,
Expand All @@ -110,13 +110,14 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pipelineName = pipelineDescription.getPipelineName();
this.stringDeserializer = new StringDeserializer();
shutdownInProgress = new AtomicBoolean(false);
}

@Override
public void start(Buffer<Record<Event>> buffer) {
Properties authProperties = new Properties();
KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
glueDeserializer = KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics);
Expand All @@ -135,7 +136,11 @@ public void start(Buffer<Record<Event>> buffer) {
break;
case PLAINTEXT:
default:
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) {
kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
} else {
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
}
break;
}
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics);
Expand Down Expand Up @@ -296,7 +301,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi
}

if (schemaConfig.getType() == SchemaRegistryType.AWS_GLUE) {
setPropertiesForGlueSchemaRegistry(properties);
return;
}

Expand All @@ -309,13 +313,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi
}
}

private void setPropertiesForGlueSchemaRegistry(Properties properties) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion());
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
}

private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) {
MessageFormat dataFormat = topicConfig.getSerdeFormat();
schemaType = dataFormat.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import software.amazon.awssdk.services.kafka.KafkaClient;
Expand All @@ -25,9 +26,17 @@
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.regions.Region;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import software.amazon.awssdk.services.glue.model.Compatibility;

import org.slf4j.Logger;

import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
Expand Down Expand Up @@ -73,6 +82,8 @@ public class KafkaSourceSecurityConfigurer {

private static final int MAX_KAFKA_CLIENT_RETRIES = 360; // for one hour every 10 seconds

private static AwsCredentialsProvider credentialsProvider;


/*public static void setSaslPlainTextProperties(final KafkaSourceConfig kafkaSourConfig,
final Properties properties) {
Expand Down Expand Up @@ -173,7 +184,6 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu
}

public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuthConfig, final AwsConfig awsConfig, final Logger LOG) {
AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) {
String sessionName = "data-prepper-kafka-session" + UUID.randomUUID();
StsClient stsClient = StsClient.builder()
Expand Down Expand Up @@ -216,10 +226,11 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
try {
Thread.sleep(10000);
} catch (InterruptedException exp) {}
retryable = true;
} catch (Exception e) {
throw new RuntimeException("Failed to get bootstrap server information from MSK.", e);
}
} while (numRetries++ < MAX_KAFKA_CLIENT_RETRIES);
} while (retryable && numRetries++ < MAX_KAFKA_CLIENT_RETRIES);
if (Objects.isNull(result)) {
throw new RuntimeException("Failed to get bootstrap server information from MSK after trying multiple times with retryable exceptions.");
}
Expand All @@ -234,11 +245,14 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
}
}

public static void setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) {
public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) {
final AwsConfig awsConfig = sourceConfig.getAwsConfig();
final AuthConfig authConfig = sourceConfig.getAuthConfig();
final KafkaSourceConfig.EncryptionConfig encryptionConfig = sourceConfig.getEncryptionConfig();
final EncryptionType encryptionType = encryptionConfig.getType();
GlueSchemaRegistryKafkaDeserializer glueDeserializer = null;

credentialsProvider = DefaultCredentialsProvider.create();

String bootstrapServers = sourceConfig.getBootStrapServers();
AwsIamAuthConfig awsIamAuthConfig = null;
Expand Down Expand Up @@ -269,6 +283,15 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon
properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class);
}
}
if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) {
Map<String, Object> configs = new HashMap();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion());
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000");
configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10");
configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs);
}
if (Objects.isNull(authConfig) || Objects.isNull(authConfig.getSaslAuthConfig())) {
if (encryptionType == EncryptionType.SSL) {
properties.put(SECURITY_PROTOCOL, "SSL");
Expand All @@ -278,6 +301,7 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon
throw new RuntimeException("Bootstrap servers are not specified");
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return glueDeserializer;
}
}

0 comments on commit a5c4fe2

Please sign in to comment.