-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregator processor should evaluate aggregate_when condition before forwarding events to remote peer #4004
Aggregator processor should evaluate aggregate_when condition before forwarding events to remote peer #4004
Conversation
…forwarding events to remote peer Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -79,7 +80,9 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina | |||
|
|||
@Override | |||
public Collection<Record<Event>> execute(final Collection<Record<Event>> records) { | |||
final Collection<Record<Event>> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(records); | |||
final Collection<Record<Event>> recordsToProcess = ((RequiresPeerForwarding)innerProcessor).applicableEventsForPeerForwarding(records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the end of this method, we return:
return innerProcessor.execute(recordsToProcessLocally);
The collection returned by the processor will be the input into the next processor. With this PR, the returned collection is going to be inaccurate when aggregate_when
is present. The pipeline author still wants to process those events in downstream processors.
We need something like the following. I'll use +
for set union and -
for set difference. This should convey the idea, but these are not valid set operators in Java, so you'll need a little modification. Also, Collection
is not necessarily a Set
.
return innerProcessor.execute(recordsToProcessLocally) + (records - recordsToProcess);
We'll definitely need a unit test here.
It would be ideal to also have a core integration test.
final Collection<Record<Event>> recordsOut = new ArrayList<>(); | ||
for (Record<Event> record: records) { | ||
Event event = record.getData(); | ||
if (expressionEvaluator.evaluateConditional(whenCondition, event)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's share the same logic here as we use in the execute
method to ensure they remain consistent.
Maybe you could make a method that evaluates the conditional in both.
private boolean isEventApplicable(Record<Event> record)
} | ||
|
||
@Test | ||
void testRequiresPeerForwardingTest() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this test!
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@kkondaka , I see some test failures:
|
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making this improvement!
Description
Aggregate processor support
aggregate_when
option which may evaluate to false to some events. But the current remote forwarder interface evaluates this condition after an event is forwarded to a remote peer which is unnecessary. The condition can be evaluated locally on a forwarding node before actually forwarding the event.Modified the remote forwarding interface to allow local evaluation of events before forwarding events to remote peer.
Resolves #3996
Issues Resolved
Resolves #3996
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.