Skip to content

Commit

Permalink
Experimental support for sequence number validation in the publisher (#…
Browse files Browse the repository at this point in the history
…401)

* Moved sequence number validation to an experimental feature

Moved the sequence number validation to become an experimental feature
that can be removed in the future.

Added an annotation for experimental features.

* Delete merge conflict again?

* Add some reminder that this stuff is experimental

* Added a reason field, and some reasons

Added a reason value to the annotation, and updated two of the unusual places.
  • Loading branch information
pfifer authored and sahilpalvia committed Sep 18, 2018
1 parent 01f5db8 commit 592499f
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 200 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.annotations;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;

/**
* Anything marked as experimental may be removed at any time without warning.
*/
@Retention(RetentionPolicy.CLASS)
public @interface KinesisClientExperimental {
String reason() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.annotations.KinesisClientExperimental;

@Data
@Accessors(fluent = true)
Expand Down Expand Up @@ -80,18 +81,13 @@ public class FanOutConfig implements RetrievalSpecificConfig {
*/
private long retryBackoffMillis = 1000;

/**
* Controls whether the {@link FanOutRecordsPublisher} will validate that all the records are from the shard it's
* processing.
*/
private boolean validateRecordsAreForShard = false;

@Override
public RetrievalFactory retrievalFactory() {
return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn()).validateRecordsAreForShard(validateRecordsAreForShard);
return new FanOutRetrievalFactory(kinesisClient, getOrCreateConsumerArn());
}

private String getOrCreateConsumerArn() {
@KinesisClientExperimental(reason = "Experimentally changed from private to protected")
protected String getOrCreateConsumerArn() {
if (consumerArn != null) {
return consumerArn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

Expand All @@ -40,13 +37,13 @@
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.SequenceNumberValidator;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@Slf4j
Expand All @@ -56,14 +53,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private final KinesisAsyncClient kinesis;
private final String shardId;
private final String consumerArn;
private final boolean validateRecordShardMatching;

/**
* Creates a new FanOutRecordsPublisher.
* <p>
* This is deprecated and will be removed in a later release. Use
* {@link #FanOutRecordsPublisher(KinesisAsyncClient, String, String, boolean)} instead
* </p>
*
* @param kinesis
* the kinesis client to use for requests
Expand All @@ -72,17 +64,10 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
* @param consumerArn
* the consumer to use when retrieving records
*/
@Deprecated
public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
this(kinesis, shardId, consumerArn, false);
}

public FanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn,
boolean validateRecordShardMatching) {
this.kinesis = kinesis;
this.shardId = shardId;
this.consumerArn = consumerArn;
this.validateRecordShardMatching = validateRecordShardMatching;
}

private final Object lockObject = new Object();
Expand Down Expand Up @@ -383,7 +368,8 @@ public void cancel() {
}
subscriber = null;
if (flow != null) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
availableQueueSpace = 0;
Expand All @@ -408,6 +394,20 @@ private boolean isActiveFlow(RecordFlow requester) {
}
}

/**
* Allows validating records received from SubscribeToShard
*
* @param shardId
* the shardId the records should be from
* @param event
* the SubscribeToShard event that was received.
* @throws IllegalArgumentException
* if the records are invalid. This will trigger an error response upwards
*/
@KinesisClientExperimental(reason = "Allows providing a validation function with minimal changes")
protected void validateRecords(String shardId, SubscribeToShardEvent event) {
}

private void rejectSubscription(SdkPublisher<SubscribeToShardEventStream> publisher) {
publisher.subscribe(new Subscriber<SubscribeToShardEventStream>() {
Subscription localSub;
Expand Down Expand Up @@ -588,7 +588,6 @@ static class RecordSubscription implements Subscriber<SubscribeToShardEventStrea
private final RecordFlow flow;
private final Instant connectionStartedAt;
private final String subscribeToShardId;
private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator();

private Subscription subscription;

Expand Down Expand Up @@ -657,7 +656,14 @@ public void onNext(SubscribeToShardEventStream recordBatchEvent) {
recordBatchEvent.accept(new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
if (parent.validateRecordShardMatching && !areRecordsValid(event)) {
try {
parent.validateRecords(parent.shardId, event);
} catch (IllegalArgumentException iae) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]",
parent.shardId, connectionStartedAt, subscribeToShardId,
System.identityHashCode(subscription), iae.getMessage());
parent.errorOccurred(flow, iae);
return;
}
flow.recordsReceived(event);
Expand All @@ -666,45 +672,6 @@ public void visit(SubscribeToShardEvent event) {
}
}

private boolean areRecordsValid(SubscribeToShardEvent event) {
try {
Map<String, Integer> mismatchedRecords = recordsNotForShard(event);
if (mismatchedRecords.size() > 0) {
String mismatchReport = mismatchedRecords.entrySet().stream()
.map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue()))
.collect(Collectors.joining(", "));
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- Failing subscription due to mismatches: [ {} ]",
parent.shardId, connectionStartedAt, subscribeToShardId,
System.identityHashCode(subscription), mismatchReport);
parent.errorOccurred(flow, new IllegalArgumentException(
"Received records destined for different shards: " + mismatchReport));
return false;
}
} catch (IllegalArgumentException iae) {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onNext#vistor) @ {} id: {} (Subscription ObjectId: {}) -- "
+ "A problem occurred while validating sequence numbers: {} on subscription {}",
parent.shardId, connectionStartedAt, subscribeToShardId, System.identityHashCode(subscription),
iae.getMessage(), iae);
parent.errorOccurred(flow, iae);
return false;
}
return true;
}

private Map<String, Integer> recordsNotForShard(SubscribeToShardEvent event) {
return event.records().stream().map(r -> {
Optional<String> res = sequenceNumberValidator.shardIdFor(r.sequenceNumber());
if (!res.isPresent()) {
throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber());
}
return res.get();
}).filter(s -> !StringUtils.equalsIgnoreCase(s, parent.shardId))
.collect(Collectors.groupingBy(Function.identity())).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
}

@Override
public void onError(Throwable t) {
log.debug("{}: [SubscriptionLifetime]: (RecordSubscription#onError) @ {} id: {} -- {}: {}", parent.shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

package software.amazon.kinesis.retrieval.fanout;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
Expand All @@ -35,9 +33,6 @@ public class FanOutRetrievalFactory implements RetrievalFactory {

private final KinesisAsyncClient kinesisClient;
private final String consumerArn;
@Getter
@Setter
private boolean validateRecordsAreForShard = false;

@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
Expand All @@ -48,6 +43,6 @@ public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final Shard
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final MetricsFactory metricsFactory) {
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn, validateRecordsAreForShard);
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), consumerArn);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval.fanout.experimental;

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;

/**
* Enables validation of sequence number for every received record.
*
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
*/
@KinesisClientExperimental
public class ExperimentalFanOutConfig extends FanOutConfig {

public ExperimentalFanOutConfig(KinesisAsyncClient kinesisClient) {
super(kinesisClient);
}

@Override
public RetrievalFactory retrievalFactory() {
return new ExperimentalFanOutRetrievalFactory(kinesisClient(), getOrCreateConsumerArn());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval.fanout.experimental;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.checkpoint.SequenceNumberValidator;
import software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher;

/**
* A variation of {@link FanOutRecordsPublisher} that provides validation of every record received by the publisher.
*
* <h2><strong>This is an experimental class and may be removed at any time</strong></h2>
*/
@Slf4j
@KinesisClientExperimental
public class ExperimentalFanOutRecordsPublisher extends FanOutRecordsPublisher {

private final SequenceNumberValidator sequenceNumberValidator = new SequenceNumberValidator();

/**
* Creates a new FanOutRecordsPublisher.
*
* @param kinesis
* the kinesis client to use for requests
* @param shardId
* the shardId to retrieve records for
* @param consumerArn
*/
public ExperimentalFanOutRecordsPublisher(KinesisAsyncClient kinesis, String shardId, String consumerArn) {
super(kinesis, shardId, consumerArn);
}

@Override
protected void validateRecords(String shardId, SubscribeToShardEvent event) {
Map<String, Integer> mismatchedRecords = recordsNotForShard(shardId, event);
if (mismatchedRecords.size() > 0) {
String mismatchReport = mismatchedRecords.entrySet().stream()
.map(e -> String.format("(%s -> %d)", e.getKey(), e.getValue())).collect(Collectors.joining(", "));
throw new IllegalArgumentException("Received records destined for different shards: " + mismatchReport);
}

}

private Map<String, Integer> recordsNotForShard(String shardId, SubscribeToShardEvent event) {
return event.records().stream().map(r -> {
Optional<String> res = sequenceNumberValidator.shardIdFor(r.sequenceNumber());
if (!res.isPresent()) {
throw new IllegalArgumentException("Unable to validate sequence number of " + r.sequenceNumber());
}
return res.get();
}).filter(s -> !StringUtils.equalsIgnoreCase(s, shardId)).collect(Collectors.groupingBy(Function.identity()))
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
}
}
Loading

0 comments on commit 592499f

Please sign in to comment.