-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lambda invoke backoff #5195
base: main
Are you sure you want to change the base?
Lambda invoke backoff #5195
Conversation
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
final Event event = record.getData(); | ||
|
||
// If the condition is false, add the event to resultRecords as-is | ||
if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When condition is not applicable to sink. I guess you want to send null for sinks.
Consumer<InvokeResponse> successHandler, | ||
Consumer<Throwable> failureHandler) { | ||
final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE) | ||
.withMaxAttempts(Integer.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxAttempts should be configurable
} else if (throwable instanceof RequestTooLargeException) { | ||
//We should possibly split this out into more chunks | ||
//Ideally, we shouldn't get into here | ||
LOG.error("Request too large exception, please check your payload size", throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also should call failureHandler.accept()
final long delayMillis = backoff.nextDelayMillis(failedAttemptCount.get()); | ||
if (delayMillis < 0) { | ||
Thread.currentThread().interrupt(); | ||
throw new S3RetriesExhaustedException("Lambda retries exhausted. Current Lambda concurrency setting is either too small or too busy."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy paste issue - S3RetriesExhaustedException
. Rename it
|
||
LOG.info("Batch Chunks created after threshold check: {}", batchedBuffers.size()); | ||
for (Buffer buffer : batchedBuffers) { | ||
InvokeRequest requestPayload = buffer.getRequestPayload(functionName, invocationType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is duplicated between processor and sink
// Release event handles for records not sent to Lambda | ||
for (Record<Event> record : records) { | ||
Event event = record.getData(); | ||
releaseEventHandle(event, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failed records should be sent to DLQ and if it fails there then release should happen with false
(negative ack)
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Description
Reusing the code between lambda sink and lambda processor
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.