Skip to content

Commit

Permalink
Updated scan start_time ,end_time and range combinations (opensearch-…
Browse files Browse the repository at this point in the history
…project#3188)

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
  • Loading branch information
asifsmohammed authored Aug 18, 2023
1 parent f9a3a60 commit c06bc4a
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 266 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
*/
public class ScanOptions {
private static final Logger LOG = LoggerFactory.getLogger(ScanOptions.class);
private LocalDateTime startDateTime;
private final LocalDateTime startDateTime;

private Duration range;
private final Duration range;

private S3ScanBucketOption bucketOption;
private final S3ScanBucketOption bucketOption;

private LocalDateTime endDateTime;
private final LocalDateTime endDateTime;

private LocalDateTime useStartDateTime;
private final LocalDateTime useStartDateTime;

private LocalDateTime useEndDateTime;
private final LocalDateTime useEndDateTime;

private ScanOptions(Builder builder){
this.startDateTime = builder.startDateTime;
Expand Down Expand Up @@ -96,11 +96,7 @@ public Builder setBucketOption(S3ScanBucketOption bucketOption) {
}

public ScanOptions build() {
LocalDateTime bucketStartDateTime = Objects.isNull(bucketOption.getStartTime()) ? startDateTime : bucketOption.getStartTime();
LocalDateTime bucketEndDateTime = Objects.isNull(bucketOption.getEndTime()) ? endDateTime : bucketOption.getEndTime();
Duration bucketRange = Objects.isNull(bucketOption.getRange()) ? range : bucketOption.getRange();

long nonNullCount = Stream.of(bucketStartDateTime, bucketEndDateTime, bucketRange)
long globalLevelNonNullCount = Stream.of(startDateTime, endDateTime, range)
.filter(Objects::nonNull)
.count();

Expand All @@ -109,54 +105,35 @@ public ScanOptions build() {
.filter(Objects::nonNull)
.count();

if (nonNullCount == 3) {
if (originalBucketLevelNonNullCount == 3) {
scanRangeDateValidationError();
} else if (originalBucketLevelNonNullCount == 2) {
setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange());
} else if (originalBucketLevelNonNullCount == 1) {
if (Objects.nonNull(bucketOption.getStartTime()) || Objects.nonNull(bucketOption.getEndTime())) {
setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange());
} else {
LOG.warn("Scan is configured with start_time and end_time at global level and range at bucket level for the bucket with name {}. " +
"Unable to establish a time period with range alone at bucket level. " +
"Using start_time and end_time configured at global level and ignoring range.", bucketOption.getName());
setDateTimeToUse(startDateTime, endDateTime, range);
}
}
} else {
setDateTimeToUse(bucketStartDateTime, bucketEndDateTime, bucketRange);
if (originalBucketLevelNonNullCount != 0) {
setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange());
} else if (globalLevelNonNullCount != 0) {
setDateTimeToUse(startDateTime, endDateTime, range);
}

return new ScanOptions(this);
}

private void setDateTimeToUse(LocalDateTime bucketStartDateTime, LocalDateTime bucketEndDateTime, Duration bucketRange) {

if (Objects.nonNull(bucketStartDateTime) && Objects.nonNull(bucketEndDateTime)) {
this.useStartDateTime = bucketStartDateTime;
this.useEndDateTime = bucketEndDateTime;
} else if (Objects.nonNull(bucketStartDateTime) && Objects.nonNull(bucketRange)) {
this.useStartDateTime = bucketStartDateTime;
this.useEndDateTime = bucketStartDateTime.plus(bucketRange);
} else if (Objects.nonNull(bucketEndDateTime) && Objects.nonNull(bucketRange)) {
this.useStartDateTime = bucketEndDateTime.minus(bucketRange);
this.useEndDateTime = bucketEndDateTime;
LOG.info("Scanning objects modified from {} to {} from bucket: {}", useStartDateTime, useEndDateTime, bucketOption.getName());
} else if (Objects.nonNull(bucketStartDateTime)) {
this.useStartDateTime = bucketStartDateTime;
LOG.info("Scanning objects modified after {} from bucket: {}", useStartDateTime, bucketOption.getName());
} else if (Objects.nonNull(bucketEndDateTime)) {
this.useEndDateTime = bucketEndDateTime;
LOG.info("Scanning objects modified before {} from bucket: {}", useEndDateTime, bucketOption.getName());
} else if (Objects.nonNull(bucketRange)) {
LOG.warn("Scan is configured with just range for the bucket with name {}. Unable to establish a time period with range alone. " +
"Configure start_time or end_time, else all the objects in the bucket will be included", bucketOption.getName());
this.useEndDateTime = LocalDateTime.now();
this.useStartDateTime = this.useEndDateTime.minus(bucketRange);
LOG.info("Scanning objects modified from {} to {} from bucket: {}", useStartDateTime, useEndDateTime, bucketOption.getName());
} else {
LOG.info("Scanning all objects from bucket: {}", bucketOption.getName());
}
}

private void scanRangeDateValidationError() {
String message = "To set a time range for the bucket with name " + bucketOption.getName() +
", specify any two configurations from start_time, end_time and range";
throw new IllegalArgumentException(message);
}

@Override
public String toString() {
return "startDateTime=" + startDateTime +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer;

import java.time.Duration;
import java.time.LocalDateTime;
Expand All @@ -27,11 +28,11 @@ public class S3ScanBucketOption {
@Size(min = 3, max = 500, message = "bucket length should be at least 3 characters")
private String name;

@JsonDeserialize(using = CustomLocalDateTimeDeserializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("start_time")
private LocalDateTime startTime;

@JsonDeserialize(using = CustomLocalDateTimeDeserializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("end_time")
private LocalDateTime endTime;

Expand All @@ -46,6 +47,11 @@ public boolean hasValidTimeOptions() {
return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3;
}

@AssertFalse(message = "bucket start_time or end_time cannot be used along with range")
public boolean hasValidTimeAndRangeOptions() {
return (startTime != null || endTime != null) && range != null;
}

public String getName() {
if (name.startsWith(S3_PREFIX)) {
return name.substring(S3_PREFIX.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer;

import java.time.Duration;
import java.time.LocalDateTime;
Expand All @@ -24,11 +25,11 @@ public class S3ScanScanOptions {
@JsonProperty("range")
private Duration range;

@JsonDeserialize(using = CustomLocalDateTimeDeserializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("start_time")
private LocalDateTime startTime;

@JsonDeserialize(using = CustomLocalDateTimeDeserializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("end_time")
private LocalDateTime endTime;

Expand All @@ -45,6 +46,11 @@ public boolean hasValidTimeOptions() {
return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3;
}

@AssertFalse(message = "start_time or end_time cannot be used along with range")
public boolean hasValidTimeAndRangeOptions() {
return (startTime != null || endTime != null) && range != null;
}

@AssertTrue(message = "start_time, end_time, and range are not valid options when using scheduling with s3 scan")
public boolean hasValidTimeOptionsWithScheduling() {
return !Objects.nonNull(schedulingOptions) || Stream.of(startTime, endTime, range).noneMatch(Objects::nonNull);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -57,12 +59,10 @@ void scan_service_test_and_verify_thread_invoking() {
void scan_service_with_valid_s3_scan_configuration_test_and_verify() {
final String bucketName="my-bucket-5";
final LocalDateTime startDateTime = LocalDateTime.parse("2023-03-07T10:00:00");
final Duration range = Duration.parse("P2DT1H");
final List<String> includeKeyPathList = List.of("file1.csv","file2.csv");
final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
final S3ScanScanOptions s3ScanScanOptions = mock(S3ScanScanOptions.class);
when(s3ScanScanOptions.getStartTime()).thenReturn(startDateTime);
when(s3ScanScanOptions.getRange()).thenReturn(range);
S3ScanBucketOptions bucket = mock(S3ScanBucketOptions.class);
final S3ScanBucketOption s3ScanBucketOption = mock(S3ScanBucketOption.class);
when(s3ScanBucketOption.getName()).thenReturn(bucketName);
Expand All @@ -78,13 +78,12 @@ void scan_service_with_valid_s3_scan_configuration_test_and_verify() {
assertThat(scanOptionsBuilder.get(0).getBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions(),sameInstance(includeKeyPathList));
assertThat(scanOptionsBuilder.get(0).getBucketOption().getName(),sameInstance(bucketName));
assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(),equalTo(startDateTime));
assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(),equalTo(startDateTime.plus(range)));
assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(),equalTo(null));
}

@Test
void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() {
final String bucketName="my-bucket-5";
final LocalDateTime startDateTime = LocalDateTime.parse("2023-03-07T10:00:00");
final Duration range = Duration.parse("P2DT1H");
final List<String> includeKeyPathList = List.of("file1.csv","file2.csv");
final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
Expand All @@ -94,7 +93,6 @@ void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() {
when(s3ScanBucketOption.getName()).thenReturn(bucketName);
S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class);
when(s3ScanKeyPathOption.getS3scanIncludePrefixOptions()).thenReturn(includeKeyPathList);
when(s3ScanBucketOption.getStartTime()).thenReturn(startDateTime);
when(s3ScanBucketOption.getRange()).thenReturn(range);
when(s3ScanBucketOption.getS3ScanFilter()).thenReturn(s3ScanKeyPathOption);
when(bucket.getS3ScanBucketOption()).thenReturn(s3ScanBucketOption);
Expand All @@ -104,8 +102,10 @@ void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() {
final List<ScanOptions> scanOptionsBuilder = service.getScanOptions();
assertThat(scanOptionsBuilder.get(0).getBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions(),sameInstance(includeKeyPathList));
assertThat(scanOptionsBuilder.get(0).getBucketOption().getName(),sameInstance(bucketName));
assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(),equalTo(startDateTime));
assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(),equalTo(startDateTime.plus(range)));
assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(), lessThanOrEqualTo(LocalDateTime.now().minus(range)));
assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(), greaterThanOrEqualTo(LocalDateTime.now().minus(range).minus(Duration.parse("PT5S"))));
assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(), lessThanOrEqualTo(LocalDateTime.now()));
assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(), greaterThanOrEqualTo(LocalDateTime.now().minus(Duration.parse("PT5S"))));
}

@Test
Expand Down
Loading

0 comments on commit c06bc4a

Please sign in to comment.