Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in data prepper sources using e2e acknowledgements #3039

Merged
merged 2 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package org.opensearch.dataprepper.pipeline;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;
Expand All @@ -20,6 +23,7 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
Expand All @@ -28,10 +32,14 @@
public class ProcessWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ProcessWorker.class);

private static final String INVALID_EVENT_HANDLES = "invalidEventHandles";
private final Buffer readBuffer;
private final List<Processor> processors;
private final Pipeline pipeline;
private boolean isEmptyRecordsLogged = false;
private PluginMetrics pluginMetrics;
private final Counter invalidEventHandlesCounter;
private boolean acknowledgementsEnabled;

public ProcessWorker(
final Buffer readBuffer,
Expand All @@ -40,6 +48,9 @@ public ProcessWorker(
this.readBuffer = readBuffer;
this.processors = processors;
this.pipeline = pipeline;
this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName());
this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES);
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled();
}

@Override
Expand Down Expand Up @@ -83,12 +94,14 @@ public void run() {
}

private void processAcknowledgements(List<Event> inputEvents, Collection outputRecords) {
AcknowledgementSetManager acknowledgementSetManager = pipeline.getAcknowledgementSetManager();
Set<Event> outputEventsSet = ((ArrayList<Record<Event>>)outputRecords).stream().map(Record::getData).collect(Collectors.toSet());
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
if (event.getEventHandle() != null && !outputEventsSet.contains(event)) {
acknowledgementSetManager.releaseEventReference(event.getEventHandle(), true);
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (Objects.nonNull(eventHandle) && !outputEventsSet.contains(event)) {
eventHandle.release(true);
} else if (acknowledgementsEnabled && Objects.isNull(eventHandle)) {
invalidEventHandlesCounter.increment();
}
});
}
Expand All @@ -109,8 +122,8 @@ private void doRun() {
//Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it.
for (final Processor processor : processors) {
List<Event> inputEvents = null;
if (pipeline.getSource().areAcknowledgementsEnabled()) {
inputEvents = ((ArrayList<Record<Event>>)records).stream().map(Record::getData).collect(Collectors.toList());
if (acknowledgementsEnabled) {
inputEvents = ((ArrayList<Record<Event>>)records).stream().map(Record::getData).collect(Collectors.toList());
}
records = processor.execute(records);
if (inputEvents != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,14 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
for (ConsumerRecord<String, T> consumerRecord : partitionRecords) {
Record<Event> record = getRecord(consumerRecord);
if (record != null) {
bufferAccumulator.add(record);
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
}
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3
codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> {
try {
eventConsumer.accept(record.getData(), s3ObjectReference);
bufferAccumulator.add(record);
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer due to: {}", e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,14 @@ private void parseCompleteStreamFromResponseHeader(final AcknowledgementSet ackn
Record<Event> eventRecord = new Record<>(JacksonLog.builder().withData(optionalNode.get()).build());
try {
eventConsumer.accept(eventRecord.getData(), s3ObjectReference);
bufferAccumulator.add(eventRecord);
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(eventRecord.getData());
}
bufferAccumulator.add(eventRecord);
} catch (final Exception ex) {
LOG.error("Failed writing S3 objects to buffer due to: {}", ex.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,20 @@ public List<DeleteMessageBatchRequestEntry> handleMessages(final List<Message> m
messages.forEach(message -> {
final Record<Event> eventRecord = new Record<Event>(JacksonEvent.fromMessage(message.body()));
try {
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if(Objects.nonNull(acknowledgementSet)){
acknowledgementSet.add(eventRecord.getData());
}
bufferAccumulator.add(eventRecord);
} catch (Exception e) {
// Exception may occur when we failed to flush. In which
// case, not sending acknowledgement would be correct because
// we need to retry
throw new RuntimeException(e);
}
if(Objects.nonNull(acknowledgementSet)){
acknowledgementSet.add(eventRecord.getData());
}
});
try {
bufferAccumulator.flush();
Expand Down