Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to use old ddb stream image for REMOVE events #4275

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions data-prepper-plugins/dynamodb-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,4 @@ dependencies {

testImplementation testLibs.mockito.inline
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void start(Buffer<Record<Event>> buffer) {
DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, dynamoDBSourceConfig.getTableConfigs().get(0).getStreamConfig());
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null));
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs);
Expand All @@ -109,8 +109,10 @@ public void start(Buffer<Record<Event>> buffer) {
executor.submit(leaderScheduler);
executor.submit(exportScheduler);
executor.submit(fileLoaderScheduler);
executor.submit(streamScheduler);

if (tableConfigs.get(0).getStreamConfig() != null) {
executor.submit(streamScheduler);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;

public class StreamConfig {

@JsonProperty(value = "start_position")
private StreamStartPosition startPosition = StreamStartPosition.LATEST;

@JsonProperty("view_on_remove")
private StreamViewType viewForRemoves = StreamViewType.NEW_IMAGE;

public StreamStartPosition getStartPosition() {
return startPosition;
}

public StreamViewType getStreamViewForRemoves() {
return viewForRemoves;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.OperationType;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;

import java.time.Instant;
import java.util.HashMap;
Expand All @@ -40,6 +43,8 @@ public class StreamRecordConverter extends RecordConverter {
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {
};

private final StreamConfig streamConfig;

private final PluginMetrics pluginMetrics;

private final Counter changeEventSuccessCounter;
Expand All @@ -50,13 +55,18 @@ public class StreamRecordConverter extends RecordConverter {
private Instant currentSecond;
private int recordsSeenThisSecond = 0;

public StreamRecordConverter(final BufferAccumulator<org.opensearch.dataprepper.model.record.Record<Event>> bufferAccumulator, TableInfo tableInfo, PluginMetrics pluginMetrics) {
public StreamRecordConverter(final BufferAccumulator<org.opensearch.dataprepper.model.record.Record<Event>> bufferAccumulator,
final TableInfo tableInfo,
final PluginMetrics pluginMetrics,
final StreamConfig streamConfig) {
super(bufferAccumulator, tableInfo);
this.pluginMetrics = pluginMetrics;
this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT);
this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT);
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
this.streamConfig = streamConfig;

}

@Override
Expand All @@ -73,8 +83,10 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
Map<String, Object> data;
Map<String, Object> keys;
try {
final Map<String, AttributeValue> streamRecord = getStreamRecordFromImage(record);

// NewImage may be empty
data = convertData(record.dynamodb().newImage());
data = convertData(streamRecord);
// Always get keys from dynamodb().keys()
keys = convertKeys(record.dynamodb().keys());
} catch (final Exception e) {
Expand Down Expand Up @@ -150,4 +162,21 @@ private long calculateTieBreakingVersionFromTimestamp(final Instant eventTimeInS

return eventTimeInSeconds.getEpochSecond() * 1_000_000 + recordsSeenThisSecond;
}

private Map<String, AttributeValue> getStreamRecordFromImage(final Record record) {
if (!OperationType.REMOVE.equals(record.eventName())) {
return record.dynamodb().newImage();
}

if (StreamViewType.OLD_IMAGE.equals(streamConfig.getStreamViewForRemoves())) {
if (!record.dynamodb().hasOldImage()) {
LOG.warn("view_on_remove with OLD_IMAGE is enabled, but no old image can be found on the stream record, using NEW_IMAGE");
return record.dynamodb().newImage();
} else {
return record.dynamodb().oldImage();
}
}

return record.dynamodb().newImage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
Expand Down Expand Up @@ -121,7 +122,7 @@ private ShardConsumer(Builder builder) {
this.startTime = builder.startTime == null ? Instant.MIN : builder.startTime.minus(STREAM_EVENT_OVERLAP_TIME);
this.waitForExport = builder.waitForExport;
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics);
recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics, builder.streamConfig);
this.acknowledgementSet = builder.acknowledgementSet;
this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout;
this.shardId = builder.shardId;
Expand All @@ -132,8 +133,9 @@ private ShardConsumer(Builder builder) {
public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
final Buffer<Record<Event>> buffer,
final StreamConfig streamConfig) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig);
}


Expand Down Expand Up @@ -164,14 +166,18 @@ static class Builder {
private AcknowledgementSet acknowledgementSet;
private Duration dataFileAcknowledgmentTimeout;

private StreamConfig streamConfig;

public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final StreamConfig streamConfig) {
this.dynamoDbStreamsClient = dynamoDbStreamsClient;
this.pluginMetrics = pluginMetrics;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
this.buffer = buffer;
this.streamConfig = streamConfig;
}

public Builder tableInfo(TableInfo tableInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
Expand Down Expand Up @@ -45,18 +46,21 @@ public class ShardConsumerFactory {
private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;
private final Buffer<Record<Event>> buffer;

private final StreamConfig streamConfig;


public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordinator,
final DynamoDbStreamsClient streamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final StreamConfig streamConfig) {
this.streamsClient = streamsClient;
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
this.buffer = buffer;

this.streamConfig = streamConfig;
}

public Runnable createConsumer(final StreamPartition streamPartition,
Expand Down Expand Up @@ -97,7 +101,7 @@ public Runnable createConsumer(final StreamPartition streamPartition,

LOG.debug("Create shard consumer for {} with shardIter {}", streamPartition.getShardId(), shardIterator);
LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator);
ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer)
ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig)
.tableInfo(tableInfo)
.checkpointer(checkpointer)
.shardIterator(shardIterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,36 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class DynamoDBServiceTest {
Expand All @@ -51,6 +63,9 @@ class DynamoDBServiceTest {
@Mock
private TableConfig tableConfig;

@Mock
private StreamConfig streamConfig;

@Mock
private PluginMetrics pluginMetrics;

Expand All @@ -60,6 +75,9 @@ class DynamoDBServiceTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
private ExecutorService executorService;

private DynamoDBService dynamoDBService;

@BeforeEach
Expand All @@ -73,23 +91,54 @@ void setup() {
}

private DynamoDBService createObjectUnderTest() {
DynamoDBService objectUnderTest = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager);
return objectUnderTest;

try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(() -> Executors.newFixedThreadPool(eq(4))).thenReturn(executorService);

return new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager);
}
}

@Test
void test_normal_start() {
void test_normal_start_with_stream_config() {
when(tableConfig.getStreamConfig()).thenReturn(streamConfig);
dynamoDBService = createObjectUnderTest();
assertThat(dynamoDBService, notNullValue());

final ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);

dynamoDBService.start(buffer);

verify(executorService, times(4)).submit(runnableArgumentCaptor.capture());

assertThat(runnableArgumentCaptor.getAllValues(), notNullValue());
assertThat(runnableArgumentCaptor.getAllValues().size(), equalTo(4));
}

@Test
void test_normal_start_without_stream_config() {
when(tableConfig.getStreamConfig()).thenReturn(null);

dynamoDBService = createObjectUnderTest();
assertThat(dynamoDBService, notNullValue());

final ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);

dynamoDBService.start(buffer);

verify(executorService, times(3)).submit(runnableArgumentCaptor.capture());

assertThat(runnableArgumentCaptor.getAllValues(), notNullValue());
assertThat(runnableArgumentCaptor.getAllValues().size(), equalTo(3));
}


@Test
void test_normal_shutdown() {
dynamoDBService = createObjectUnderTest();
assertThat(dynamoDBService, notNullValue());

when(executorService.shutdownNow()).thenReturn(Collections.emptyList());
dynamoDBService.shutdown();
}

Expand Down
Loading
Loading