Skip to content

Commit

Permalink
Aggregate Processor: local mode should work when there is no when con…
Browse files Browse the repository at this point in the history
…dition (#4380) (#4381)

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
(cherry picked from commit 2d84ba9)

Co-authored-by: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com>
  • Loading branch information
opensearch-trigger-bot[bot] and kkondaka authored Apr 1, 2024
1 parent c63730b commit 1860d54
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ public void shutdown() {

@Override
public boolean isApplicableEventForPeerForwarding(Event event) {
if (whenCondition == null) {
return true;
}
if (localMode) {
return false;
}
if (whenCondition == null) {
return true;
}
return expressionEvaluator.evaluateConditional(whenCondition, event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class AggregateProcessorTest {

@Mock
private AggregateActionResponse firstAggregateActionResponse;
@Mock
private AggregateActionResponse secondAggregateActionResponse;

@Mock
private PluginMetrics pluginMetrics;
Expand Down Expand Up @@ -311,6 +313,71 @@ void handleEvent_returning_with_condition_eliminates_one_record_local_only() {
verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}

@Test
void handleEvent_returning_no_condition_eliminates_one_record_local_only() {
final String eventKey = UUID.randomUUID().toString();
final String key1 = UUID.randomUUID().toString();
final String key2 = UUID.randomUUID().toString();
Event firstEvent;
Event secondEvent;
final Map<String, Object> eventMap1 = new HashMap<>();
eventMap1.put(eventKey, key1);

firstEvent = JacksonEvent.builder()
.withData(eventMap1)
.withEventType("event")
.build();

final Map<String, Object> eventMap2 = new HashMap<>();
eventMap2.put(eventKey, key2);

secondEvent = JacksonEvent.builder()
.withData(eventMap2)
.withEventType("event")
.build();


when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent))
.thenReturn(identificationKeysMap);
when(identificationKeysHasher.createIdentificationKeysMapFromEvent(secondEvent))
.thenReturn(identificationKeysMap);
when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse);
when(aggregateActionSynchronizer.handleEventForGroup(secondEvent, identificationKeysMap, aggregateGroup)).thenReturn(secondAggregateActionResponse);
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(null);
when(aggregateProcessorConfig.getLocalMode()).thenReturn(true);
final AggregateProcessor objectUnderTest = createObjectUnderTest();
when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.emptyList());
when(aggregateActionResponse.getEvent()).thenReturn(event);
when(firstAggregateActionResponse.getEvent()).thenReturn(firstEvent);
when(secondAggregateActionResponse.getEvent()).thenReturn(secondEvent);

event.toMap().put(eventKey, key1);
List<Record<Event>> recordsIn = new ArrayList<>();
recordsIn.add(new Record<Event>(firstEvent));
recordsIn.add(new Record<Event>(secondEvent));
recordsIn.add(new Record<Event>(event));
Collection<Record<Event>> c = recordsIn;
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(event), equalTo(false));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(firstEvent), equalTo(false));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(secondEvent), equalTo(false));
final List<Record<Event>> recordsOut = (List<Record<Event>>) objectUnderTest.doExecute(c);

assertThat(recordsOut.size(), equalTo(3));
assertThat(recordsOut.get(0), notNullValue());
assertThat(recordsOut.get(0).getData(), equalTo(firstEvent));
assertThat(recordsOut.get(1), notNullValue());
assertThat(recordsOut.get(1).getData(), equalTo(secondEvent));
assertThat(recordsOut.get(2), notNullValue());
assertThat(recordsOut.get(2).getData(), equalTo(event));

verify(actionHandleEventsDroppedCounter).increment(0);
verify(actionHandleEventsOutCounter).increment(3);
verifyNoInteractions(actionConcludeGroupEventsDroppedCounter);
verifyNoInteractions(actionConcludeGroupEventsOutCounter);

verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}

@Test
void handleEvent_returning_with_event_adds_event_to_records_out() {
final AggregateProcessor objectUnderTest = createObjectUnderTest();
Expand Down

0 comments on commit 1860d54

Please sign in to comment.