Skip to content

Commit

Permalink
Fix PeerForwardingProcessorDecorator to process records locally when …
Browse files Browse the repository at this point in the history
…exclude identification keys is set (#5178)

* Fix PeerForwardingProcessorDecorator to process records locally when exclude identification keys is set

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka authored Nov 8, 2024
1 parent 60990cd commit 347a803
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ public Collection<Record<Event>> execute(final Collection<Record<Event>> records
final Collection<Record<Event>> recordsSkipped = new ArrayList<>();
for (Record<Event> record: records) {
if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) {
recordsToProcess.add(record);
if (isPeerForwardingDisabled()) {
recordsToProcessLocally.add(record);
} else {
recordsToProcess.add(record);
}
} else if (((RequiresPeerForwarding)innerProcessor).isForLocalProcessingOnly(record.getData())){
recordsToProcessLocally.add(record);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,45 @@ void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentific
verifyNoMoreInteractions(peerForwarderProvider);
}

@Test
void PeerForwardingProcessingDecorator_with_localProcessingOnlyWithExcludeIdentificationKeys() {
List<Processor> objectsUnderTest = new ArrayList<>();
Processor innerProcessor1 = (Processor)requiresPeerForwarding;
Processor innerProcessor2 = (Processor)requiresPeerForwardingCopy;
objectsUnderTest.add(innerProcessor1);
objectsUnderTest.add(innerProcessor2);

LocalPeerForwarder localPeerForwarder = mock(LocalPeerForwarder.class);
when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder);
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
List<Record<Event>> testData = Collections.singletonList(record);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(true);

when(innerProcessor1.execute(testData)).thenReturn(testData);
when(innerProcessor2.execute(testData)).thenReturn(testData);

when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(identificationKeys);

final List<Processor> processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(objectsUnderTest, Set.of(identificationKeys));
assertThat(processors.size(), equalTo(2));
for (final Processor processor: processors) {
assertTrue(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());
}
verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
Collection<Record<Event>> result = processors.get(0).execute(testData);
assertThat(result.size(), equalTo(testData.size()));
assertThat(result, equalTo(testData));
result = processors.get(1).execute(testData);
assertThat(result.size(), equalTo(testData.size()));
assertThat(result, equalTo(testData));
}


@Test
void PeerForwardingProcessingDecorator_with_localProcessingOnly() {
List<Processor> processorList = new ArrayList<>();
Expand Down

0 comments on commit 347a803

Please sign in to comment.