From 7f132b8361d45610000c6cee81e1e11b489c2fa6 Mon Sep 17 00:00:00 2001 From: hwanghw Date: Fri, 11 Oct 2024 14:51:56 -0700 Subject: [PATCH 1/2] 1.4.0 release 1/ update CHANGELOG 2/ fix compile errors 3/ add SINK_PREFIX to proxy option names --- CHANGELOG | 4 +++- .../sql/connector/kinesis/CachedKinesisProducer.scala | 4 ++-- .../spark/sql/connector/kinesis/KinesisOptions.scala | 8 ++++---- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 68284e9..8cc8e59 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,7 +2,9 @@ Changelog ========= v1.4.0 ---------------------------- -- plumb through optional recordTtl param to KPL +- plumb through optional recordTtl param to KPL (issue #31) +- Allow defining the ExplicitHashKey for a Kinesis Sink (issue #23) +- Allow proxy configurations to be passed as option inputs for a sink (issue #21) v1.3.0 ---------------------------- diff --git a/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala b/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala index 4085543..3b54ca8 100644 --- a/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala +++ b/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala @@ -71,7 +71,7 @@ object CachedKinesisProducer extends Logging { KinesisOptions.DEFAULT_SINK_RECORD_MAX_BUFFERED_TIME) .toLong - val recordTTL = kinesisParams.getOrElse( + val recordTTL = producerConfiguration.getOrElse( KinesisOptions.SINK_RECORD_TTL, KinesisOptions.DEFAULT_SINK_RECORD_TTL) .toLong @@ -94,7 +94,7 @@ object CachedKinesisProducer extends Logging { com.amazonaws.auth.DefaultAWSCredentialsProviderChain.getInstance ) .setRegion(region) - .setRecordTTL(recordTTL) + .setRecordTtl(recordTTL) // check for proxy settings if (producerConfiguration.contains(KinesisOptions.PROXY_ADDRESS.toLowerCase(Locale.ROOT))) { diff --git a/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala b/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala index 9824043..3cd2f6d 100644 --- a/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala +++ b/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala @@ -182,10 +182,10 @@ object KinesisOptions { val DEFAULT_SINK_AGGREGATION: String = "true" // proxy options - val PROXY_ADDRESS: String = "proxyAddress" - val PROXY_PORT: String = "proxyPort" - val PROXY_USERNAME: String = "proxyUsername" - val PROXY_PASSWORD: String = "proxyPassword" + val PROXY_ADDRESS: String = SINK_PREFIX + "proxyAddress" + val PROXY_PORT: String = SINK_PREFIX + "proxyPort" + val PROXY_USERNAME: String = SINK_PREFIX + "proxyUsername" + val PROXY_PASSWORD: String = SINK_PREFIX + "proxyPassword" def apply(parameters: CaseInsensitiveStringMap): KinesisOptions = { From 5f6a9b0244ca1fc3245535429c7ac6b27e2daf3e Mon Sep 17 00:00:00 2001 From: hwanghw Date: Mon, 18 Nov 2024 10:51:15 -0800 Subject: [PATCH 2/2] add metrics --- pom.xml | 10 +++- .../kinesis/KinesisV2MicrobatchStream.scala | 4 +- .../kinesis/KinesisV2PartitionReader.scala | 35 ++++++++++- .../metrics/PartitionReaderMetrics.scala | 58 +++++++++++++++++++ .../metrics/ShardConsumerMetrics.scala | 53 +++++++++++++++++ .../kinesis/retrieval/ShardConsumer.scala | 7 ++- .../retrieval/efo/EfoShardSubscriber.scala | 2 +- .../polling/PollingRecordBatchPublisher.scala | 3 +- 8 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 src/main/scala/org/apache/spark/sql/connector/kinesis/metrics/PartitionReaderMetrics.scala create mode 100644 src/main/scala/org/apache/spark/sql/connector/kinesis/metrics/ShardConsumerMetrics.scala diff --git a/pom.xml b/pom.xml index 19db220..06cef87 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-streaming-sql-kinesis-connector_2.12 - 1.4.0 + 1.5.0-SNAPSHOT jar Spark Structured Streaming Kinesis Connector Connector to read from and write into Kinesis from Structured Streaming Applications @@ -34,6 +34,7 @@ UTF-8 2.23.9 0.15.8 + 4.2.15 1.8 1.8 @@ -202,7 +203,12 @@ 2.19.0 test - + + io.dropwizard.metrics + metrics-core + ${metrics.version} + +