From 1edf97cdb7619e5f2f2434a16f2b8ccbfc6fd965 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Tue, 18 Jul 2023 15:53:41 -0700 Subject: [PATCH] Fix race condition in data prepper sources using e2e acknowledgements (#3039) * Fix race condition in data prepper sources using e2e acknowledgements Signed-off-by: Krishna Kondaka * Fixed checkStyle error Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../dataprepper/pipeline/ProcessWorker.java | 26 ++++++++++++++----- .../consumer/KafkaSourceCustomConsumer.java | 6 ++++- .../plugins/source/S3ObjectWorker.java | 6 ++++- .../plugins/source/S3SelectObjectWorker.java | 6 ++++- .../handler/RawSqsMessageHandler.java | 13 +++++++--- 5 files changed, 44 insertions(+), 13 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index baa23da681..56d81ba68b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -5,12 +5,14 @@ package org.opensearch.dataprepper.pipeline; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.pipeline.common.FutureHelper; import org.opensearch.dataprepper.pipeline.common.FutureHelperResult; import org.slf4j.Logger; @@ -20,6 +22,7 @@ import java.util.List; import java.util.ArrayList; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -28,10 +31,14 @@ public class ProcessWorker implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ProcessWorker.class); + private static final String INVALID_EVENT_HANDLES = "invalidEventHandles"; private final Buffer readBuffer; private final List processors; private final Pipeline pipeline; private boolean isEmptyRecordsLogged = false; + private PluginMetrics pluginMetrics; + private final Counter invalidEventHandlesCounter; + private boolean acknowledgementsEnabled; public ProcessWorker( final Buffer readBuffer, @@ -40,6 +47,9 @@ public ProcessWorker( this.readBuffer = readBuffer; this.processors = processors; this.pipeline = pipeline; + this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName()); + this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES); + this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled(); } @Override @@ -83,12 +93,14 @@ public void run() { } private void processAcknowledgements(List inputEvents, Collection outputRecords) { - AcknowledgementSetManager acknowledgementSetManager = pipeline.getAcknowledgementSetManager(); Set outputEventsSet = ((ArrayList>)outputRecords).stream().map(Record::getData).collect(Collectors.toSet()); // For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it - inputEvents.forEach(event -> { - if (event.getEventHandle() != null && !outputEventsSet.contains(event)) { - acknowledgementSetManager.releaseEventReference(event.getEventHandle(), true); + inputEvents.forEach(event -> { + EventHandle eventHandle = event.getEventHandle(); + if (Objects.nonNull(eventHandle) && !outputEventsSet.contains(event)) { + eventHandle.release(true); + } else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) { + invalidEventHandlesCounter.increment(); } }); } @@ -109,8 +121,8 @@ private void doRun() { //Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it. for (final Processor processor : processors) { List inputEvents = null; - if (pipeline.getSource().areAcknowledgementsEnabled()) { - inputEvents = ((ArrayList>)records).stream().map(Record::getData).collect(Collectors.toList()); + if (acknowledgementsEnabled) { + inputEvents = ((ArrayList>)records).stream().map(Record::getData).collect(Collectors.toList()); } records = processor.execute(records); if (inputEvents != null) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 67505a67f4..666c2496f7 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -224,10 +224,14 @@ private void iterateRecordPartitions(ConsumerRecords records, fin for (ConsumerRecord consumerRecord : partitionRecords) { Record record = getRecord(consumerRecord); if (record != null) { - bufferAccumulator.add(record); + // Always add record to acknowledgementSet before adding to + // buffer because another thread may take and process + // buffer contents before the event record is added + // to acknowledgement set if (acknowledgementSet != null) { acknowledgementSet.add(record.getData()); } + bufferAccumulator.add(record); } } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java index fb0bfb412a..faf5729f9e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java @@ -85,10 +85,14 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3 codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> { try { eventConsumer.accept(record.getData(), s3ObjectReference); - bufferAccumulator.add(record); + // Always add record to acknowledgementSet before adding to + // buffer because another thread may take and process + // buffer contents before the event record is added + // to acknowledgement set if (acknowledgementSet != null) { acknowledgementSet.add(record.getData()); } + bufferAccumulator.add(record); } catch (final Exception e) { LOG.error("Failed writing S3 objects to buffer due to: {}", e.getMessage()); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java index c7fc957350..84993f92a5 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java @@ -263,10 +263,14 @@ private void parseCompleteStreamFromResponseHeader(final AcknowledgementSet ackn Record eventRecord = new Record<>(JacksonLog.builder().withData(optionalNode.get()).build()); try { eventConsumer.accept(eventRecord.getData(), s3ObjectReference); - bufferAccumulator.add(eventRecord); + // Always add record to acknowledgementSet before adding to + // buffer because another thread may take and process + // buffer contents before the event record is added + // to acknowledgement set if (acknowledgementSet != null) { acknowledgementSet.add(eventRecord.getData()); } + bufferAccumulator.add(eventRecord); } catch (final Exception ex) { LOG.error("Failed writing S3 objects to buffer due to: {}", ex.getMessage()); } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/handler/RawSqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/handler/RawSqsMessageHandler.java index 091e87a61e..844eecb97b 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/handler/RawSqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/handler/RawSqsMessageHandler.java @@ -42,13 +42,20 @@ public List handleMessages(final List m messages.forEach(message -> { final Record eventRecord = new Record(JacksonEvent.fromMessage(message.body())); try { + // Always add record to acknowledgementSet before adding to + // buffer because another thread may take and process + // buffer contents before the event record is added + // to acknowledgement set + if(Objects.nonNull(acknowledgementSet)){ + acknowledgementSet.add(eventRecord.getData()); + } bufferAccumulator.add(eventRecord); } catch (Exception e) { + // Exception may occur when we failed to flush. In which + // case, not sending acknowledgement would be correct because + // we need to retry throw new RuntimeException(e); } - if(Objects.nonNull(acknowledgementSet)){ - acknowledgementSet.add(eventRecord.getData()); - } }); try { bufferAccumulator.flush();