Skip to content

Commit

Permalink
Making small adjustments to ttl_delete based on review
Browse files Browse the repository at this point in the history
Signed-off-by: Lee Hannigan <lhnng@amazon.com>
  • Loading branch information
Lee Hannigan committed Sep 29, 2024
1 parent f1a3598 commit e31c376
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ public class MetadataKeyAttributes {
static final String DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE = "dynamodb_event_name";

static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name";
static final String DDB_STREAM_EVENT_USER_IDENTITY = "ttl_delete";
static final String DDB_STREAM_EVENT_IS_TTL_DELETE = "ttl_delete";
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import software.amazon.awssdk.services.dynamodb.model.Identity;
import software.amazon.awssdk.services.dynamodb.model.OperationType;

import java.math.BigDecimal;
import java.time.Instant;
Expand All @@ -26,7 +28,7 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_USER_IDENTITY;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE;

/**
* Base Record Processor definition.
Expand All @@ -40,6 +42,8 @@ public abstract class RecordConverter {
private final BufferAccumulator<Record<Event>> bufferAccumulator;

private final TableInfo tableInfo;
static final String TTL_USER_PRINCIPAL = "dynamodb.amazonaws.com";
static final String TTL_USER_TYPE = "Service";

public RecordConverter(final BufferAccumulator<Record<Event>> bufferAccumulator, TableInfo tableInfo) {
this.bufferAccumulator = bufferAccumulator;
Expand Down Expand Up @@ -86,7 +90,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,
final long eventCreationTimeMillis,
final long eventVersionNumber,
final String eventName,
final Boolean userIdentity) throws Exception {
final Identity userIdentity) throws Exception {
Event event = JacksonEvent.builder()
.withEventType(getEventType())
.withData(data)
Expand All @@ -98,14 +102,21 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,
event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
event.getMetadata().setExternalOriginationTime(externalOriginationTime);
}

EventMetadata eventMetadata = event.getMetadata();

eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableInfo.getTableName());
eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis);
eventMetadata.setAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName);
eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName));
eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber);
eventMetadata.setAttribute(DDB_STREAM_EVENT_USER_IDENTITY, userIdentity);

// Only set ttl_delete for stream events, which are of type REMOVE containing a userIdentity
final boolean isTtlDelete = OperationType.REMOVE.toString().equals(eventName) &&
userIdentity != null &&
TTL_USER_PRINCIPAL.equals(userIdentity.principalId()) &&
TTL_USER_TYPE.equals(userIdentity.type());
eventMetadata.setAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE, isTtlDelete);

String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName());
eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey);
Expand All @@ -127,7 +138,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,
final Map<String, Object> data,
final long timestamp,
final long eventVersionNumber) throws Exception {
addToBuffer(acknowledgementSet, data, data, timestamp, eventVersionNumber, null, Boolean.FALSE);
addToBuffer(acknowledgementSet, data, data, timestamp, eventVersionNumber, null, null);
}

private String mapStreamEventNameToBulkAction(final String streamEventName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
int eventCount = 0;
for (Record record : records) {
final long bytes = record.dynamodb().sizeBytes();
final Boolean userIdentity = record.userIdentity() != null && "dynamodb.amazonaws.com".equals(record.userIdentity().principalId());
Map<String, Object> data;
Map<String, Object> keys;
try {
Expand All @@ -99,7 +98,7 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
try {
bytesReceivedSummary.record(bytes);
final long eventCreationTimeMillis = calculateTieBreakingVersionFromTimestamp(record.dynamodb().approximateCreationDateTime());
addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), eventCreationTimeMillis, record.eventNameAsString(), userIdentity);
addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), eventCreationTimeMillis, record.eventNameAsString(), record.userIdentity());
bytesProcessedSummary.record(bytes);
eventCount++;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_USER_IDENTITY;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_PROCESSED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_RECEIVED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT;
Expand Down Expand Up @@ -98,6 +98,8 @@ class StreamRecordConverterTest {

private final String partitionKeyAttrName = "PK";
private final String sortKeyAttrName = "SK";
private final String principalId = "dynamodb.amazonaws.com";
private final String userIdentityType = "Service";


@BeforeEach
Expand Down Expand Up @@ -214,7 +216,7 @@ void test_writeSingleRecordToBuffer_with_other_data(final String additionalStrin
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli()));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_USER_IDENTITY), equalTo(Boolean.FALSE));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE), equalTo(false));

assertThat(event.get(partitionKeyAttrName, String.class), notNullValue());
assertThat(event.get(sortKeyAttrName, String.class), notNullValue());
Expand Down Expand Up @@ -486,8 +488,8 @@ void test_writeSingleRecordToBuffer_with_userIdentity() throws Exception {
when(streamConfig.getStreamViewForRemoves()).thenReturn(StreamViewType.OLD_IMAGE);

final Identity userIdentity = Identity.builder()
.principalId("dynamodb.amazonaws.com")
.type("Service")
.principalId(principalId)
.type(userIdentityType)
.build();

final String newImageKey = UUID.randomUUID().toString();
Expand All @@ -509,7 +511,40 @@ void test_writeSingleRecordToBuffer_with_userIdentity() throws Exception {
JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData();

assertThat(event.getMetadata(), notNullValue());
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_USER_IDENTITY), equalTo(Boolean.TRUE));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE), equalTo(Boolean.TRUE));

}

@Test
void test_writeSingleRecordToBuffer_with_wrong_userIdentity() throws Exception {

when(streamConfig.getStreamViewForRemoves()).thenReturn(StreamViewType.OLD_IMAGE);

final Identity userIdentity = Identity.builder()
.principalId("lambda.amazonaws.com")
.type(userIdentityType)
.build();

final String newImageKey = UUID.randomUUID().toString();
final String newImageValue = UUID.randomUUID().toString();

final Map<String, AttributeValue> newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build());
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, null, OperationType.REMOVE, userIdentity));
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig);
doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture());

objectUnderTest.writeToBuffer(null, records);

verify(bufferAccumulator).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(changeEventSuccessCounter).increment(anyDouble());
assertThat(recordArgumentCaptor.getValue().getData(), notNullValue());
JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData();

assertThat(event.getMetadata(), notNullValue());
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE), equalTo(Boolean.FALSE));

}

Expand Down

0 comments on commit e31c376

Please sign in to comment.