Skip to content

Commit

Permalink
Retry on 50x from AWS (#11)
Browse files Browse the repository at this point in the history
* Retry on 50x from AWS

- Add DynamoRetriableException to group all exceptions on which retry should be invoked (500,502,503,504).
- Add warning when a failure happened, but retry will be invoked.

* Document retry mechanism

* Retry Refinements

- Update readme, including version of update
- Add 2 additional plausible retriable 400 issue's

* process review remarks

---------

Co-authored-by: Jesper Van Caeter <Thejipppp@users.noreply.github.com>
  • Loading branch information
spangaer and Thejipppp authored Oct 18, 2024
1 parent 8f5de84 commit 1188a1a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ my-dynamodb-journal.aws-client-config.max-connections = <your value here>

Changing this number changes both the number of concurrent connections and the used thread-pool size.

## Retry behavior

This plugin uses exponential backoff when plausible and retriable errors from DynamoDB occur. This includes network glitches (50x; since Pekko 1.1.0) and throughput exceptions (400; extended in Pekko 1.1.0).

The backoff strategy is very simple.
- There are a maximum of 10 retries.
- The first time it retries, it takes 1 millisecond.
- That time is doubled with every retry.

This means that the last waiting time would be about half a second, and if responses would be immediate, the total retrial process would take about 1 second. In practice, response time would be more than 0 of course.


## Compatibility with Akka versions

pekko-persistence-dynamodb is derived from [akka-persistence-dynamodb](https://github.com/akka/akka-persistence-dynamodb) v1.3.0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.apache.pekko.persistence.dynamodb.journal

import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.{ AmazonServiceException, AmazonWebServiceRequest }
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient
import com.amazonaws.services.dynamodbv2.model._
Expand All @@ -23,6 +23,7 @@ import pekko.event.LoggingAdapter
import pekko.pattern.after
import pekko.persistence.dynamodb.{ DynamoDBConfig, Item }
import pekko.util.ccompat.JavaConverters._
import pekko.annotation.InternalApi

import java.util.{ concurrent => juc }

Expand All @@ -32,6 +33,36 @@ import scala.concurrent.duration._
case class LatencyReport(nanos: Long, retries: Int)
private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis)

/**
* Auxiliary object to help determining whether we should retry on a certain throwable.
*/
@InternalApi
private object DynamoRetriableException {
def unapply(ex: AmazonServiceException) = {
ex match {
// 50x network glitches
case _: InternalServerErrorException =>
Some(ex)
case ase if ase.getStatusCode >= 502 && ase.getStatusCode <= 504 =>
// retry on more common server errors
Some(ex)

// 400 throughput issues
case _: ProvisionedThroughputExceededException =>
Some(ex)
case _: RequestLimitExceededException =>
// rate of on-demand requests exceeds the allowed account throughput
// and the table cannot be scaled further
Some(ex)
case ase if ase.getErrorCode == "ThrottlingException" =>
// rate of AWS requests exceeds the allowed throughput
Some(ex)
case _ =>
None
}
}
}

trait DynamoDBHelper {

implicit val ec: ExecutionContext
Expand All @@ -57,7 +88,7 @@ trait DynamoDBHelper {
val handler = new AsyncHandler[In, Out] {
override def onError(ex: Exception) =
ex match {
case e: ProvisionedThroughputExceededException =>
case DynamoRetriableException(_) =>
p.tryFailure(ex)
case _ =>
val n = name
Expand All @@ -81,12 +112,16 @@ trait DynamoDBHelper {
val state = new RetryStateHolder

lazy val retry: PartialFunction[Throwable, Future[Out]] = {
case _: ProvisionedThroughputExceededException if state.retries > 0 =>
case DynamoRetriableException(ex) if state.retries > 0 =>
val backoff = state.backoff
state.retries -= 1
state.backoff *= 2
log.warning("failure while executing {} but will retry! Message: {}", name, ex.getMessage())
after(backoff, scheduler)(sendSingle().recoverWith(retry))
case other => Future.failed(other)
case other: DynamoDBJournalFailure => Future.failed(other)
case other =>
val n = name
Future.failed(new DynamoDBJournalFailure("failed retry " + n, other))
}

if (Tracing) log.debug("{}", name)
Expand Down

0 comments on commit 1188a1a

Please sign in to comment.