Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ Changelog
=========
v1.4.0
----------------------------
- plumb through optional recordTtl param to KPL
- plumb through optional recordTtl param to KPL (issue #31)
- Allow defining the ExplicitHashKey for a Kinesis Sink (issue #23)
- Allow proxy configurations to be passed as option inputs for a sink (issue #21)

v1.3.0
----------------------------
Expand Down
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-sql-kinesis-connector_2.12</artifactId>
<version>1.4.0</version>
<version>1.5.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Spark Structured Streaming Kinesis Connector</name>
<description>Connector to read from and write into Kinesis from Structured Streaming Applications</description>
Expand All @@ -34,6 +34,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<aws.sdkv2.version>2.23.9</aws.sdkv2.version>
<aws.kpl.version>0.15.8</aws.kpl.version>
<metrics.version>4.2.15</metrics.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
Expand Down Expand Up @@ -202,7 +203,12 @@
<version>2.19.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object CachedKinesisProducer extends Logging {
KinesisOptions.DEFAULT_SINK_RECORD_MAX_BUFFERED_TIME)
.toLong

val recordTTL = kinesisParams.getOrElse(
val recordTTL = producerConfiguration.getOrElse(
KinesisOptions.SINK_RECORD_TTL,
KinesisOptions.DEFAULT_SINK_RECORD_TTL)
.toLong
Expand All @@ -94,7 +94,7 @@ object CachedKinesisProducer extends Logging {
com.amazonaws.auth.DefaultAWSCredentialsProviderChain.getInstance
)
.setRegion(region)
.setRecordTTL(recordTTL)
.setRecordTtl(recordTTL)

// check for proxy settings
if (producerConfiguration.contains(KinesisOptions.PROXY_ADDRESS.toLowerCase(Locale.ROOT))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ object KinesisOptions {
val DEFAULT_SINK_AGGREGATION: String = "true"

// proxy options
val PROXY_ADDRESS: String = "proxyAddress"
val PROXY_PORT: String = "proxyPort"
val PROXY_USERNAME: String = "proxyUsername"
val PROXY_PASSWORD: String = "proxyPassword"
val PROXY_ADDRESS: String = SINK_PREFIX + "proxyAddress"
val PROXY_PORT: String = SINK_PREFIX + "proxyPort"
val PROXY_USERNAME: String = SINK_PREFIX + "proxyUsername"
val PROXY_PASSWORD: String = SINK_PREFIX + "proxyPassword"

def apply(parameters: CaseInsensitiveStringMap): KinesisOptions = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ class KinesisV2MicrobatchStream (
) {
currentShardOffsets = Some(
new ShardOffsets(prevBatchId + 1, options.streamName,
latestShardInfo.filter(_.iteratorType != ShardEnd.iteratorType)
))
latestShardInfo.filter(_.iteratorType != ShardEnd.iteratorType))
)
} else {
logInfo(s"Offsets are unchanged since ${KinesisOptions.AVOID_EMPTY_BATCHES} is enabled")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.kinesis.client.KinesisClientConsumer
import org.apache.spark.sql.connector.kinesis.client.KinesisClientFactory
import org.apache.spark.sql.connector.kinesis.metadata.MetadataCommitter
import org.apache.spark.sql.connector.kinesis.metadata.MetadataCommitterFactory
import org.apache.spark.sql.connector.kinesis.metrics.PartitionReaderMetrics
import org.apache.spark.sql.connector.kinesis.retrieval.DataReceiver
import org.apache.spark.sql.connector.kinesis.retrieval.KinesisUserRecord
import org.apache.spark.sql.connector.kinesis.retrieval.RecordBatchPublisher
Expand All @@ -48,6 +49,10 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator
import org.apache.spark.util.SerializableConfiguration

import scala.util.Failure
import scala.util.Success
import scala.util.Try

class KinesisV2PartitionReader (schema: StructType,
sourcePartition: KinesisV2InputPartition,
streamName: String,
Expand All @@ -67,6 +72,7 @@ class KinesisV2PartitionReader (schema: StructType,

private val kinesisShardId = sourcePartition.startShardInfo.shardId
private val startTimestamp: Long = System.currentTimeMillis
private val metricsCollector = PartitionReaderMetrics()

val kinesisStreamShard: StreamShard = StreamShard(streamName, Shard.builder().shardId(kinesisShardId).build())
val kinesisPosition: KinesisPosition = KinesisPosition.make(sourcePartition.startShardInfo.iteratorType,
Expand Down Expand Up @@ -148,12 +154,15 @@ class KinesisV2PartitionReader (schema: StructType,
val putResult = dataQueue.offer(record, dataQueueWaitTimeout.getSeconds, TimeUnit.SECONDS)

if (putResult) {
metricsCollector.enqueueRecordCounter.inc()
if (KinesisUserRecord.nonEmptyUserRecord(record)) {
updateState(streamShard, record.sequenceNumber)
} else {
metricsCollector.enqueueEmptyRecordCounter.inc()
logDebug(s"put empty record with millisBehindLatest ${record.millisBehindLatest} to ${streamShard}'s data queue'")
}
} else {
metricsCollector.enqueueFailureCounter.inc()
logWarning(s"fail to enqueue record for ${streamShard}")
}

Expand Down Expand Up @@ -210,11 +219,12 @@ class KinesisV2PartitionReader (schema: StructType,
}
} else if (KinesisUserRecord.shardEndUserRecord(userRecord)) {
logInfo(s"Got shard end user record for ${kinesisStreamShard}")
metricsCollector.shardEndUserRecordCounter.inc()
hasShardClosed.set(true)
fetchNext = false
} else if (KinesisUserRecord.emptyUserRecord(userRecord)) {
logInfo(s"Got empty user record with millisBehindLatest ${userRecord.millisBehindLatest} for ${kinesisPosition}")

metricsCollector.emptyUserRecordCounter.inc()
if (userRecord.millisBehindLatest > 0) {
// when the stream not receiving new data for a long time, there can be real data events
// after the empty ones, reset the counter
Expand All @@ -223,6 +233,7 @@ class KinesisV2PartitionReader (schema: StructType,
}

} else {
metricsCollector.userRecordCounter.inc()
if (userRecord.data.length > 0) {
lastEmptyCnt = 0
emptyCnt = 0
Expand Down Expand Up @@ -250,6 +261,7 @@ class KinesisV2PartitionReader (schema: StructType,
}
else {
logError(s"Got userRecord with zero data length ${userRecord}. Not supposed to reach here.")
metricsCollector.zeroLengthUserRecordCounter.inc()
fetchNext = false
}
}
Expand Down Expand Up @@ -315,9 +327,30 @@ class KinesisV2PartitionReader (schema: StructType,
underlying.next()
}

private def logMetrics(): Unit = {
Try(metricsCollector.json) match {
case Success(metricsString) =>
logInfo(s"Partition Reader log metrics for ${kinesisStreamShard}: ${metricsString}")
case Failure(e) =>
logError("failed to get Partition Reader metrics for ${kinesisStreamShard}", e)
}

Try(shardConsumer.consumerMetricsCollector.json) match {
case Success(metricsString) =>
logInfo(s"Shard Consumer log metrics for ${kinesisStreamShard}: ${metricsString}")
case Failure(e) =>
logError("failed to get Shard Consumer metrics for ${kinesisStreamShard}", e)
}

}

override def close(): Unit = {
logInfo(s"Start to close ${sourcePartition.startShardInfo} current value of closed=${closed}")
if(closed.compareAndSet(false, true)) {

logMetrics()
logInfo(s"dataQueue size before clear ${dataQueue.size()}")

// clear the queue to unblock enqueue operations
dataQueue.clear()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.kinesis.metrics

import scala.collection.JavaConverters._
import com.codahale.metrics.Counter
import com.codahale.metrics.MetricRegistry
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

class PartitionReaderMetrics() {

private val metricRegistry = new MetricRegistry

private def getCounter(name: String): Counter = {
metricRegistry.counter(MetricRegistry.name("PartitionReader", name))
}

val enqueueRecordCounter: Counter = getCounter("enqueueRecordCounter")
val enqueueEmptyRecordCounter: Counter = getCounter("enqueueEmptyRecordCounter")
val enqueueFailureCounter: Counter = getCounter("enqueueFailureCounter")
val userRecordCounter: Counter = getCounter("userRecordCounter")
val shardEndUserRecordCounter: Counter = getCounter("shardEndUserRecordCounter")
val emptyUserRecordCounter: Counter = getCounter("emptyUserRecordCounter")
val zeroLengthUserRecordCounter: Counter = getCounter("zeroLengthUserRecordCounter")


def json: String = {
Serialization.write(
metricRegistry.getCounters.asScala.map { kv =>
(kv._1, kv._2.getCount)
}
)(PartitionReaderMetrics.format)
}
}

object PartitionReaderMetrics {
val format = Serialization.formats(NoTypeHints)

def apply(): PartitionReaderMetrics = {
new PartitionReaderMetrics()
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.kinesis.metrics

import scala.collection.JavaConverters._
import com.codahale.metrics.Counter
import com.codahale.metrics.MetricRegistry
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

class ShardConsumerMetrics() {

private val metricRegistry = new MetricRegistry

private def getCounter(name: String): Counter = {
metricRegistry.counter(MetricRegistry.name("ShardConsumer", name))
}

val rawRecordsCounter: Counter = getCounter("rawRecordsCounter")
val userRecordsCounter: Counter = getCounter("userRecordsCounter")
val batchCounter: Counter = getCounter("batchCounter")


def json: String = {
Serialization.write(
metricRegistry.getCounters.asScala.map { kv =>
(kv._1, kv._2.getCount)
}
)(ShardConsumerMetrics.format)
}
}

object ShardConsumerMetrics {
val format = Serialization.formats(NoTypeHints)

def apply(): ShardConsumerMetrics = {
new ShardConsumerMetrics()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.connector.kinesis.retrieval
import scala.util.control.Breaks.break
import scala.util.control.Breaks.breakable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.kinesis.metrics.ShardConsumerMetrics
import org.apache.spark.sql.connector.kinesis.retrieval.RecordBatchPublisherRunStatus.CANCELLED
import org.apache.spark.sql.connector.kinesis.retrieval.RecordBatchPublisherRunStatus.COMPLETE
import org.apache.spark.sql.connector.kinesis.retrieval.SequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM
Expand All @@ -39,6 +40,7 @@ class ShardConsumer(
val recordBatchPublisher: RecordBatchPublisher) extends Runnable with Logging{

private var lastSequenceNum: SequenceNumber = SequenceNumber.toSequenceNumber(recordBatchPublisher.initialStartingPosition)
val consumerMetricsCollector: ShardConsumerMetrics = ShardConsumerMetrics()
val streamShard = recordBatchPublisher.streamShard

logInfo(s"ShardConsumer init on ${streamShard}, startingPosition: ${recordBatchPublisher.initialStartingPosition}")
Expand All @@ -54,7 +56,10 @@ class ShardConsumer(
s" batch size: ${batch.totalSizeInBytes}, " +
s"number of raw records ${batch.numberOfRawRecords}, " +
s"number of user records ${batch.numberOfDeaggregatedRecord}")

consumerMetricsCollector.batchCounter.inc()
consumerMetricsCollector.rawRecordsCounter.inc(batch.numberOfRawRecords)
consumerMetricsCollector.userRecordsCounter.inc(batch.numberOfDeaggregatedRecord)

batch.userRecords.foreach { userRecord =>
if (filterDeaggregatedRecord(userRecord)) {
enqueueRecord(userRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ class EfoShardSubscriber(val consumerArn: String,
break
} else {
if (subscriptionEvent.get.isSubscribeToShardEvent) { // Request for KDS to send the next record batch
logTrace(s"EfoShardSubscriber isSubscribeToShardEvent - ${consumerArn}::${streamShard}")
subscription.requestRecord()
val event: SubscribeToShardEvent = subscriptionEvent.get.getSubscribeToShardEvent
continuationSequenceNumber = event.continuationSequenceNumber
logTrace(s"EfoShardSubscriber isSubscribeToShardEvent - ${consumerArn}::${streamShard} - ${continuationSequenceNumber}")
eventConsumer.accept(event)
} else {
if (subscriptionEvent.get.isSubscriptionComplete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ class PollingRecordBatchPublisher(
}
catch {
case eiEx: ExpiredIteratorException =>
logError(s"Encountered an unexpected expired iterator ${nextShardItr} for shard ${streamShard}. refreshing the iterator ...", eiEx)
logWarning(s"Encountered an expired iterator ${nextShardItr} for shard ${streamShard}." +
s" refreshing the iterator ...", eiEx)
nextShardItr = getShardIterator
// sleep for the fetch interval before the next getRecords attempt with the
// refreshed iterator
Expand Down