Skip to content

Commit

Permalink
Aggregator processor should evaluate aggregate_when condition before …
Browse files Browse the repository at this point in the history
…forwarding events to remote peer (#4004)

Aggregator processor should evaluate aggregate_when condition before forwarding events to remote peer

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka committed Jan 25, 2024
1 parent bb132af commit 2be8166
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

package org.opensearch.dataprepper.model.peerforwarder;
import org.opensearch.dataprepper.model.event.Event;

import java.util.Collection;

Expand All @@ -18,4 +19,15 @@ public interface RequiresPeerForwarding {
* @return A set of keys
*/
Collection<String> getIdentificationKeys();

/**
* Determines if an event should be forwarded to the remote peer
*
* @param event input event
*
* @return true if the event should be forwarded to the peer
*/
default boolean isApplicableEventForPeerForwarding(Event event) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.peerforwarder;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

import java.util.Collection;

class RequiresPeerForwardingTest {

public class SimpleRequiresPeerForwarding implements RequiresPeerForwarding {
@Override
public Collection<String> getIdentificationKeys() {
return null;
}
}

@Test
void testRequiresPeerForwardingTest() {
Event event = mock(Event.class);
RequiresPeerForwarding requiresPeerForwarding = new SimpleRequiresPeerForwarding();
assertThat(requiresPeerForwarding.isApplicableEventForPeerForwarding(event), equalTo(true));
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -79,13 +80,25 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina

@Override
public Collection<Record<Event>> execute(final Collection<Record<Event>> records) {
final Collection<Record<Event>> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(records);
final Collection<Record<Event>> recordsToProcess = new ArrayList<>();
final Collection<Record<Event>> recordsSkipped = new ArrayList<>();
for (Record<Event> record: records) {
if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) {
recordsToProcess.add(record);
} else {
recordsSkipped.add(record);
}
}
final Collection<Record<Event>> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(recordsToProcess);

final Collection<Record<Event>> receivedRecordsFromBuffer = peerForwarder.receiveRecords();

final Collection<Record<Event>> recordsToProcessLocally = CollectionUtils.union(
recordsToProcessOnLocalPeer, receivedRecordsFromBuffer);

return innerProcessor.execute(recordsToProcessLocally);
Collection<Record<Event>> recordsOut = innerProcessor.execute(recordsToProcessLocally);
recordsOut.addAll(recordsSkipped);
return recordsOut;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public void run() {
}
}

private void processAcknowledgements(List<Event> inputEvents, Collection outputRecords) {
Set<Event> outputEventsSet = ((ArrayList<Record<Event>>)outputRecords).stream().map(Record::getData).collect(Collectors.toSet());
private void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) {
Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet());
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.ArgumentMatchers.anyCollection;
Expand Down Expand Up @@ -97,7 +98,6 @@ void decorateProcessors_with_different_identification_key_should_throw() {
assertThrows(RuntimeException.class, () -> createObjectUnderTesDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy)));
}


@Test
void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() {
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.emptyList());
Expand Down Expand Up @@ -129,9 +129,12 @@ void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentific

@Test
void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correct_values() {
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
List<Record<Event>> testData = Collections.singletonList(record);

when(peerForwarder.forwardRecords(testData)).thenReturn(testData);
when(peerForwarder.forwardRecords(anyCollection())).thenReturn(testData);

when(processor.execute(testData)).thenReturn(testData);

Expand All @@ -140,18 +143,21 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc
final Collection<Record<Event>> records = processors.get(0).execute(testData);

verify(requiresPeerForwarding, times(2)).getIdentificationKeys();
verify(peerForwarder).forwardRecords(testData);
verify(peerForwarder).forwardRecords(anyCollection());
Assertions.assertNotNull(records);
assertThat(records.size(), equalTo(testData.size()));
assertThat(records, equalTo(testData));
}

@Test
void PeerForwardingProcessingDecorator_execute_should_receiveRecords() {
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
Collection<Record<Event>> forwardTestData = Collections.singletonList(record);
Collection<Record<Event>> receiveTestData = Collections.singletonList(mock(Record.class));

when(peerForwarder.forwardRecords(forwardTestData)).thenReturn(forwardTestData);
when(peerForwarder.forwardRecords(anyCollection())).thenReturn(forwardTestData);
when(peerForwarder.receiveRecords()).thenReturn(receiveTestData);

final Collection<Record<Event>> expectedRecordsToProcessLocally = CollectionUtils.union(forwardTestData, receiveTestData);
Expand All @@ -163,7 +169,7 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() {
final Collection<Record<Event>> records = processors.get(0).execute(forwardTestData);

verify(requiresPeerForwarding, times(2)).getIdentificationKeys();
verify(peerForwarder).forwardRecords(forwardTestData);
verify(peerForwarder).forwardRecords(anyCollection());
verify(peerForwarder).receiveRecords();
Assertions.assertNotNull(records);
assertThat(records.size(), equalTo(expectedRecordsToProcessLocally.size()));
Expand All @@ -172,6 +178,9 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() {

@Test
void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execute() {
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
Collection<Record<Event>> testData = Collections.singletonList(record);

Expand All @@ -180,6 +189,38 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut
verify(processor).execute(anyCollection());
}

@Test
void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_overridden() {
Event event1 = mock(Event.class);
Event event2 = mock(Event.class);
Event event3 = mock(Event.class);
Record record1 = mock(Record.class);
Record record2 = mock(Record.class);
Record record3 = mock(Record.class);
Record aggregatedRecord = mock(Record.class);
List<Record> aggregatedRecords = new ArrayList<>();
aggregatedRecords.add(aggregatedRecord);
when(processor.execute(anyCollection())).thenReturn(aggregatedRecords);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event1)).thenReturn(true);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(true);
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
when(record1.getData()).thenReturn(event1);
when(record2.getData()).thenReturn(event2);
when(record3.getData()).thenReturn(event3);
Collection<Record<Event>> recordsIn = new ArrayList<>();
recordsIn.add(record1);
recordsIn.add(record2);
recordsIn.add(record3);

assertThat(processors.size(), equalTo(1));
Collection<Record<Event>> recordsOut = processors.get(0).execute(recordsIn);
verify(processor).execute(anyCollection());
assertThat(recordsOut.size(), equalTo(2));
assertTrue(recordsOut.contains(aggregatedRecord));
assertTrue(recordsOut.contains(record2));
}

@Test
void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_processors_prepareForShutdown() {
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
Expand Down Expand Up @@ -208,4 +249,4 @@ void PeerForwardingProcessingDecorator_shutdown_will_call_inner_processors_shutd
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ public void shutdown() {

}

@Override
public boolean isApplicableEventForPeerForwarding(Event event) {
if (whenCondition == null) {
return true;
}
return expressionEvaluator.evaluateConditional(whenCondition, event);
}

@Override
public Collection<String> getIdentificationKeys() {
return aggregateProcessorConfig.getIdentificationKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ void handleEvent_returning_with_condition_eliminates_one_record() {
recordsIn.add(new Record<Event>(secondEvent));
recordsIn.add(new Record<Event>(event));
Collection<Record<Event>> c = recordsIn;
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(event), equalTo(true));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(firstEvent), equalTo(true));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(secondEvent), equalTo(false));
final List<Record<Event>> recordsOut = (List<Record<Event>>) objectUnderTest.doExecute(c);

assertThat(recordsOut.size(), equalTo(2));
Expand Down

0 comments on commit 2be8166

Please sign in to comment.