diff --git a/README.md b/README.md index 5854ec2..6e39c78 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,18 @@ my-dynamodb-journal.aws-client-config.max-connections = Changing this number changes both the number of concurrent connections and the used thread-pool size. +## Retry behavior + +This plugin uses exponential backoff when plausible and retriable errors from DynamoDB occur. This includes network glitches (50x; since Pekko 1.1.0) and throughput exceptions (400; extended in Pekko 1.1.0). + +The backoff strategy is very simple. +- There are a maximum of 10 retries. +- The first time it retries, it takes 1 millisecond. +- That time is doubled with every retry. + +This means that the last waiting time would be about half a second, and if responses would be immediate, the total retrial process would take about 1 second. In practice, response time would be more than 0 of course. + + ## Compatibility with Akka versions pekko-persistence-dynamodb is derived from [akka-persistence-dynamodb](https://github.com/akka/akka-persistence-dynamodb) v1.3.0. diff --git a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala index 5dd3bb7..e48e2b0 100644 --- a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala +++ b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala @@ -13,7 +13,7 @@ package org.apache.pekko.persistence.dynamodb.journal -import com.amazonaws.AmazonWebServiceRequest +import com.amazonaws.{ AmazonServiceException, AmazonWebServiceRequest } import com.amazonaws.handlers.AsyncHandler import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient import com.amazonaws.services.dynamodbv2.model._ @@ -23,6 +23,7 @@ import pekko.event.LoggingAdapter import pekko.pattern.after import pekko.persistence.dynamodb.{ DynamoDBConfig, Item } import pekko.util.ccompat.JavaConverters._ +import pekko.annotation.InternalApi import java.util.{ concurrent => juc } @@ -32,6 +33,36 @@ import scala.concurrent.duration._ case class LatencyReport(nanos: Long, retries: Int) private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis) +/** + * Auxiliary object to help determining whether we should retry on a certain throwable. + */ +@InternalApi +private object DynamoRetriableException { + def unapply(ex: AmazonServiceException) = { + ex match { + // 50x network glitches + case _: InternalServerErrorException => + Some(ex) + case ase if ase.getStatusCode >= 502 && ase.getStatusCode <= 504 => + // retry on more common server errors + Some(ex) + + // 400 throughput issues + case _: ProvisionedThroughputExceededException => + Some(ex) + case _: RequestLimitExceededException => + // rate of on-demand requests exceeds the allowed account throughput + // and the table cannot be scaled further + Some(ex) + case ase if ase.getErrorCode == "ThrottlingException" => + // rate of AWS requests exceeds the allowed throughput + Some(ex) + case _ => + None + } + } +} + trait DynamoDBHelper { implicit val ec: ExecutionContext @@ -57,7 +88,7 @@ trait DynamoDBHelper { val handler = new AsyncHandler[In, Out] { override def onError(ex: Exception) = ex match { - case e: ProvisionedThroughputExceededException => + case DynamoRetriableException(_) => p.tryFailure(ex) case _ => val n = name @@ -81,12 +112,16 @@ trait DynamoDBHelper { val state = new RetryStateHolder lazy val retry: PartialFunction[Throwable, Future[Out]] = { - case _: ProvisionedThroughputExceededException if state.retries > 0 => + case DynamoRetriableException(ex) if state.retries > 0 => val backoff = state.backoff state.retries -= 1 state.backoff *= 2 + log.warning("failure while executing {} but will retry! Message: {}", name, ex.getMessage()) after(backoff, scheduler)(sendSingle().recoverWith(retry)) - case other => Future.failed(other) + case other: DynamoDBJournalFailure => Future.failed(other) + case other => + val n = name + Future.failed(new DynamoDBJournalFailure("failed retry " + n, other)) } if (Tracing) log.debug("{}", name)