Skip to content

Commit

Permalink
Retry on 50x from AWS
Browse files Browse the repository at this point in the history
- Add DynamoRetriableException to group all exceptions on which retry should be invoked (500,502,503,504).
- Add warning when a failure happened, but retry will be invoked.
  • Loading branch information
Thejipppp authored and spangaer committed Jan 9, 2024
1 parent 2105f52 commit a4448c7
Showing 1 changed file with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.apache.pekko.persistence.dynamodb.journal

import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.{ AmazonServiceException, AmazonWebServiceRequest }
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient
import com.amazonaws.services.dynamodbv2.model._
Expand All @@ -32,6 +32,25 @@ import scala.concurrent.duration._
case class LatencyReport(nanos: Long, retries: Int)
private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis)

/**
* Auxiliary object to help determining whether we should retry on a certain throwable.
*/
object DynamoRetriableException {
def unapply(ex: AmazonServiceException) = {
ex match {
case _: ProvisionedThroughputExceededException =>
Some(ex)
case _: InternalServerErrorException =>
Some(ex)
case ase if ase.getStatusCode >= 502 && ase.getStatusCode <= 504 =>
// retry on more common server errors
Some(ex)
case _ =>
None
}
}
}

trait DynamoDBHelper {

implicit val ec: ExecutionContext
Expand All @@ -57,7 +76,7 @@ trait DynamoDBHelper {
val handler = new AsyncHandler[In, Out] {
override def onError(ex: Exception) =
ex match {
case e: ProvisionedThroughputExceededException =>
case DynamoRetriableException(_) =>
p.tryFailure(ex)
case _ =>
val n = name
Expand All @@ -81,12 +100,16 @@ trait DynamoDBHelper {
val state = new RetryStateHolder

lazy val retry: PartialFunction[Throwable, Future[Out]] = {
case _: ProvisionedThroughputExceededException if state.retries > 0 =>
case DynamoRetriableException(ex) if state.retries > 0 =>
val backoff = state.backoff
state.retries -= 1
state.backoff *= 2
log.warning("failure while executing {} but will retry! Message: {}", name, ex.getMessage())
after(backoff, scheduler)(sendSingle().recoverWith(retry))
case other => Future.failed(other)
case other: DynamoDBJournalFailure => Future.failed(other)
case other =>
val n = name
Future.failed(new DynamoDBJournalFailure("failed retry " + n, other))
}

if (Tracing) log.debug("{}", name)
Expand Down

0 comments on commit a4448c7

Please sign in to comment.