Skip to content

Commit

Permalink
Add origination time to buffer event and populate the partition key (#…
Browse files Browse the repository at this point in the history
…4971)

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Co-authored-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sb2k16 and sbose2k21 committed Sep 26, 2024
1 parent 54873bf commit 5ffc738
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
Expand All @@ -29,10 +31,21 @@ public KinesisRecordConverter(final InputCodec codec) {
this.codec = codec;
}

public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecords) throws IOException {
public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecords,
final String streamName) throws IOException {
List<Record<Event>> records = new ArrayList<>();
for (KinesisClientRecord record : kinesisClientRecords) {
processRecord(record, records::add);
for (KinesisClientRecord kinesisClientRecord : kinesisClientRecords) {
processRecord(kinesisClientRecord, record -> {
records.add(record);
Event event = record.getData();
EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE,
streamName.toLowerCase());
eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE, kinesisClientRecord.partitionKey());
final Instant externalOriginationTime = kinesisClientRecord.approximateArrivalTimestamp();
event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
event.getMetadata().setExternalOriginationTime(externalOriginationTime);
});
}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;
package org.opensearch.dataprepper.plugins.kinesis.source.converter;

public class MetadataKeyAttributes {
static final String KINESIS_STREAM_NAME_METADATA_ATTRIBUTE = "kinesis_stream_name";
public static final String KINESIS_STREAM_NAME_METADATA_ATTRIBUTE = "stream_name";
public static final String KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
Expand Down Expand Up @@ -162,15 +161,13 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {

// Track the records for checkpoint purpose
kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer());
List<Record<Event>> records = kinesisRecordConverter.convert(processRecordsInput.records());
List<Record<Event>> records = kinesisRecordConverter.convert(processRecordsInput.records(), streamIdentifier.streamName());

int eventCount = 0;
for (Record<Event> record: records) {
Event event = record.getData();
acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event));
EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE,
streamIdentifier.streamName().toLowerCase());

bufferAccumulator.add(record);
eventCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.mockito.Mockito.verify;

public class KinesisRecordConverterTest {
private static final String streamId = "stream-1";

@Test
void setup() throws IOException {
Expand All @@ -52,7 +53,7 @@ void setup() throws IOException {
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
.data(ByteBuffer.wrap(sample_record_data.getBytes()))
.build();
kinesisRecordConverter.convert(List.of(kinesisClientRecord));
kinesisRecordConverter.convert(List.of(kinesisClientRecord), streamId);
verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class));
}

Expand All @@ -79,7 +80,7 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
.data(ByteBuffer.wrap(writer.toString().getBytes()))
.build();
List<Record<Event>> events = kinesisRecordConverter.convert(List.of(kinesisClientRecord));
List<Record<Event>> events = kinesisRecordConverter.convert(List.of(kinesisClientRecord), streamId);

assertEquals(events.size(), numRecords);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter;
import org.opensearch.dataprepper.plugins.kinesis.source.converter.MetadataKeyAttributes;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
Expand Down Expand Up @@ -189,9 +190,10 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied()

List<Record<Event>> records = new ArrayList<>();
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -235,9 +237,10 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled()

List<Record<Event>> records = new ArrayList<>();
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -285,9 +288,10 @@ void testProcessRecordsWithAcknowledgementsEnabled()

List<Record<Event>> records = new ArrayList<>();
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -339,9 +343,10 @@ void testProcessRecordsWithNDJsonInputCodec()

List<Record<Event>> records = new ArrayList<>();
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
Expand Down Expand Up @@ -381,9 +386,10 @@ void testProcessRecordsNoThrowException()

List<Record<Event>> records = new ArrayList<>();
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
final Throwable exception = mock(RuntimeException.class);
doThrow(exception).when(bufferAccumulator).add(any(Record.class));

Expand All @@ -405,9 +411,10 @@ void testProcessRecordsBufferFlushNoThrowException()

List<Record<Event>> records = new ArrayList<>();
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId);
Record<Event> record = new Record<>(event);
records.add(record);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords))).thenReturn(records);
when(kinesisRecordConverter.convert(eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
final Throwable exception = mock(RuntimeException.class);
doThrow(exception).when(bufferAccumulator).flush();

Expand Down

0 comments on commit 5ffc738

Please sign in to comment.