Skip to content

Commit

Permalink
Fix race condition in data prepper sources using e2e acknowledgements (
Browse files Browse the repository at this point in the history
…#3039)

* Fix race condition in data prepper sources using e2e acknowledgements

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fixed checkStyle error

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka committed Jul 18, 2023
1 parent 8785bbf commit 1edf97c
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

package org.opensearch.dataprepper.pipeline;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;
import org.slf4j.Logger;
Expand All @@ -20,6 +22,7 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
Expand All @@ -28,10 +31,14 @@
public class ProcessWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ProcessWorker.class);

private static final String INVALID_EVENT_HANDLES = "invalidEventHandles";
private final Buffer readBuffer;
private final List<Processor> processors;
private final Pipeline pipeline;
private boolean isEmptyRecordsLogged = false;
private PluginMetrics pluginMetrics;
private final Counter invalidEventHandlesCounter;
private boolean acknowledgementsEnabled;

public ProcessWorker(
final Buffer readBuffer,
Expand All @@ -40,6 +47,9 @@ public ProcessWorker(
this.readBuffer = readBuffer;
this.processors = processors;
this.pipeline = pipeline;
this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName());
this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES);
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled();
}

@Override
Expand Down Expand Up @@ -83,12 +93,14 @@ public void run() {
}

private void processAcknowledgements(List<Event> inputEvents, Collection outputRecords) {
AcknowledgementSetManager acknowledgementSetManager = pipeline.getAcknowledgementSetManager();
Set<Event> outputEventsSet = ((ArrayList<Record<Event>>)outputRecords).stream().map(Record::getData).collect(Collectors.toSet());
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
if (event.getEventHandle() != null && !outputEventsSet.contains(event)) {
acknowledgementSetManager.releaseEventReference(event.getEventHandle(), true);
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (Objects.nonNull(eventHandle) && !outputEventsSet.contains(event)) {
eventHandle.release(true);
} else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) {
invalidEventHandlesCounter.increment();
}
});
}
Expand All @@ -109,8 +121,8 @@ private void doRun() {
//Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it.
for (final Processor processor : processors) {
List<Event> inputEvents = null;
if (pipeline.getSource().areAcknowledgementsEnabled()) {
inputEvents = ((ArrayList<Record<Event>>)records).stream().map(Record::getData).collect(Collectors.toList());
if (acknowledgementsEnabled) {
inputEvents = ((ArrayList<Record<Event>>)records).stream().map(Record::getData).collect(Collectors.toList());
}
records = processor.execute(records);
if (inputEvents != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,14 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
for (ConsumerRecord<String, T> consumerRecord : partitionRecords) {
Record<Event> record = getRecord(consumerRecord);
if (record != null) {
bufferAccumulator.add(record);
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
}
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3
codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> {
try {
eventConsumer.accept(record.getData(), s3ObjectReference);
bufferAccumulator.add(record);
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer due to: {}", e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,14 @@ private void parseCompleteStreamFromResponseHeader(final AcknowledgementSet ackn
Record<Event> eventRecord = new Record<>(JacksonLog.builder().withData(optionalNode.get()).build());
try {
eventConsumer.accept(eventRecord.getData(), s3ObjectReference);
bufferAccumulator.add(eventRecord);
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(eventRecord.getData());
}
bufferAccumulator.add(eventRecord);
} catch (final Exception ex) {
LOG.error("Failed writing S3 objects to buffer due to: {}", ex.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,20 @@ public List<DeleteMessageBatchRequestEntry> handleMessages(final List<Message> m
messages.forEach(message -> {
final Record<Event> eventRecord = new Record<Event>(JacksonEvent.fromMessage(message.body()));
try {
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if(Objects.nonNull(acknowledgementSet)){
acknowledgementSet.add(eventRecord.getData());
}
bufferAccumulator.add(eventRecord);
} catch (Exception e) {
// Exception may occur when we failed to flush. In which
// case, not sending acknowledgement would be correct because
// we need to retry
throw new RuntimeException(e);
}
if(Objects.nonNull(acknowledgementSet)){
acknowledgementSet.add(eventRecord.getData());
}
});
try {
bufferAccumulator.flush();
Expand Down

0 comments on commit 1edf97c

Please sign in to comment.