From e31c376ab9b336009c525180a40b1d1cd49bf0d6 Mon Sep 17 00:00:00 2001 From: Lee Hannigan Date: Sun, 29 Sep 2024 14:20:35 +0100 Subject: [PATCH] Making small adjustments to ttl_delete based on review Signed-off-by: Lee Hannigan --- .../converter/MetadataKeyAttributes.java | 2 +- .../dynamodb/converter/RecordConverter.java | 19 ++++++-- .../converter/StreamRecordConverter.java | 3 +- .../converter/StreamRecordConverterTest.java | 45 ++++++++++++++++--- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java index 4d6db1dc8f..4f0166581b 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java @@ -21,5 +21,5 @@ public class MetadataKeyAttributes { static final String DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE = "dynamodb_event_name"; static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name"; - static final String DDB_STREAM_EVENT_USER_IDENTITY = "ttl_delete"; + static final String DDB_STREAM_EVENT_IS_TTL_DELETE = "ttl_delete"; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index 5b50f2cd5e..dd4c6fe3e3 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -13,6 +13,8 @@ import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; +import software.amazon.awssdk.services.dynamodb.model.Identity; +import software.amazon.awssdk.services.dynamodb.model.OperationType; import java.math.BigDecimal; import java.time.Instant; @@ -26,7 +28,7 @@ import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_USER_IDENTITY; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE; /** * Base Record Processor definition. @@ -40,6 +42,8 @@ public abstract class RecordConverter { private final BufferAccumulator> bufferAccumulator; private final TableInfo tableInfo; + static final String TTL_USER_PRINCIPAL = "dynamodb.amazonaws.com"; + static final String TTL_USER_TYPE = "Service"; public RecordConverter(final BufferAccumulator> bufferAccumulator, TableInfo tableInfo) { this.bufferAccumulator = bufferAccumulator; @@ -86,7 +90,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet, final long eventCreationTimeMillis, final long eventVersionNumber, final String eventName, - final Boolean userIdentity) throws Exception { + final Identity userIdentity) throws Exception { Event event = JacksonEvent.builder() .withEventType(getEventType()) .withData(data) @@ -98,6 +102,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet, event.getEventHandle().setExternalOriginationTime(externalOriginationTime); event.getMetadata().setExternalOriginationTime(externalOriginationTime); } + EventMetadata eventMetadata = event.getMetadata(); eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableInfo.getTableName()); @@ -105,7 +110,13 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet, eventMetadata.setAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName); eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName)); eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber); - eventMetadata.setAttribute(DDB_STREAM_EVENT_USER_IDENTITY, userIdentity); + + // Only set ttl_delete for stream events, which are of type REMOVE containing a userIdentity + final boolean isTtlDelete = OperationType.REMOVE.toString().equals(eventName) && + userIdentity != null && + TTL_USER_PRINCIPAL.equals(userIdentity.principalId()) && + TTL_USER_TYPE.equals(userIdentity.type()); + eventMetadata.setAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE, isTtlDelete); String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName()); eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey); @@ -127,7 +138,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet, final Map data, final long timestamp, final long eventVersionNumber) throws Exception { - addToBuffer(acknowledgementSet, data, data, timestamp, eventVersionNumber, null, Boolean.FALSE); + addToBuffer(acknowledgementSet, data, data, timestamp, eventVersionNumber, null, null); } private String mapStreamEventNameToBulkAction(final String streamEventName) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index ed7f27036f..85f7df05d2 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -80,7 +80,6 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List data; Map keys; try { @@ -99,7 +98,7 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build()); + List records = Collections.singletonList(buildRecord(Instant.now(), newImage, null, OperationType.REMOVE, userIdentity)); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(null, records); + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + verify(changeEventSuccessCounter).increment(anyDouble()); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE), equalTo(Boolean.FALSE)); }