Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic retry policies #550

Merged
merged 4 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 39 additions & 29 deletions modules/core/shared/src/main/scala/retry/RetryPolicies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@ object RetryPolicies:

/** Don't retry at all and always give up. Only really useful for combining with other policies.
*/
def alwaysGiveUp[F[_]: Applicative]: RetryPolicy[F] =
RetryPolicy.liftWithShow(Function.const(GiveUp), "alwaysGiveUp")
def alwaysGiveUp[F[_]: Applicative]: RetryPolicy[F, Any] =
RetryPolicy.liftWithShow((_, _) => GiveUp, "alwaysGiveUp")

/** Delay by a constant amount before each retry. Never give up.
*/
def constantDelay[F[_]: Applicative](delay: FiniteDuration): RetryPolicy[F] =
def constantDelay[F[_]: Applicative](delay: FiniteDuration): RetryPolicy[F, Any] =
RetryPolicy.liftWithShow(
Function.const(DelayAndRetry(delay)),
(_, _) => DelayAndRetry(delay),
show"constantDelay($delay)"
)

/** Each delay is twice as long as the previous one. Never give up.
*/
def exponentialBackoff[F[_]: Applicative](
baseDelay: FiniteDuration
): RetryPolicy[F] =
): RetryPolicy[F, Any] =
RetryPolicy.liftWithShow(
{ status =>
{ (_, status) =>
val delay =
safeMultiply(
baseDelay,
Expand All @@ -62,9 +62,9 @@ object RetryPolicies:

/** Retry without delay, giving up after the given number of retries.
*/
def limitRetries[F[_]: Applicative](maxRetries: Int): RetryPolicy[F] =
def limitRetries[F[_]: Applicative](maxRetries: Int): RetryPolicy[F, Any] =
RetryPolicy.liftWithShow(
{ status =>
{ (_, status) =>
if status.retriesSoFar >= maxRetries then GiveUp
else DelayAndRetry(Duration.Zero)
},
Expand All @@ -76,11 +76,9 @@ object RetryPolicies:
* e.g. if `baseDelay` is 10 milliseconds, the delays before each retry will be 10 ms, 10 ms, 20 ms, 30ms,
* 50ms, 80ms, 130ms, ...
*/
def fibonacciBackoff[F[_]: Applicative](
baseDelay: FiniteDuration
): RetryPolicy[F] =
def fibonacciBackoff[F[_]: Applicative](baseDelay: FiniteDuration): RetryPolicy[F, Any] =
RetryPolicy.liftWithShow(
{ status =>
{ (_, status) =>
val delay =
safeMultiply(baseDelay, Fibonacci.fibonacci(status.retriesSoFar + 1))
DelayAndRetry(delay)
Expand All @@ -91,9 +89,9 @@ object RetryPolicies:
/** "Full jitter" backoff algorithm. See
* https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
*/
def fullJitter[F[_]: Applicative](baseDelay: FiniteDuration): RetryPolicy[F] =
def fullJitter[F[_]: Applicative](baseDelay: FiniteDuration): RetryPolicy[F, Any] =
RetryPolicy.liftWithShow(
{ status =>
{ (_, status) =>
val e = Math.pow(2.0, status.retriesSoFar.toDouble).toLong
val maxDelay = safeMultiply(baseDelay, e)
val delayNanos = (maxDelay.toNanos * Random.nextDouble()).toLong
Expand All @@ -104,48 +102,60 @@ object RetryPolicies:

/** Set an upper bound on any individual delay produced by the given policy.
*/
def capDelay[F[_]: Applicative](
def capDelay[F[_]: Applicative, Res](
cap: FiniteDuration,
policy: RetryPolicy[F]
): RetryPolicy[F] =
policy: RetryPolicy[F, Res]
): RetryPolicy[F, Res] =
policy.meet(constantDelay(cap))

/** Add an upper bound to a policy such that once the given time-delay amount <b>per try</b> has been
* reached or exceeded, the policy will stop retrying and give up. If you need to stop retrying once
* <b>cumulative</b> delay reaches a time-delay amount, use [[limitRetriesByCumulativeDelay]].
*/
def limitRetriesByDelay[F[_]: Applicative](
def limitRetriesByDelay[F[_]: Applicative, Res](
threshold: FiniteDuration,
policy: RetryPolicy[F]
): RetryPolicy[F] =
def decideNextRetry(status: RetryStatus): F[PolicyDecision] =
policy.decideNextRetry(status).map {
policy: RetryPolicy[F, Res]
): RetryPolicy[F, Res] =
def decideNextRetry(actionResult: Res, status: RetryStatus): F[PolicyDecision] =
policy.decideNextRetry(actionResult, status).map {
case r @ DelayAndRetry(delay) =>
if delay > threshold then GiveUp else r
case GiveUp => GiveUp
}

RetryPolicy.withShow[F](
RetryPolicy.withShow[F, Res](
decideNextRetry,
show"limitRetriesByDelay(threshold=$threshold, $policy)"
)

/** Add an upperbound to a policy such that once the cumulative delay over all retries has reached or
* exceeded the given limit, the policy will stop retrying and give up.
*/
def limitRetriesByCumulativeDelay[F[_]: Applicative](
def limitRetriesByCumulativeDelay[F[_]: Applicative, Res](
threshold: FiniteDuration,
policy: RetryPolicy[F]
): RetryPolicy[F] =
def decideNextRetry(status: RetryStatus): F[PolicyDecision] =
policy.decideNextRetry(status).map {
policy: RetryPolicy[F, Res]
): RetryPolicy[F, Res] =
def decideNextRetry(actionResult: Res, status: RetryStatus): F[PolicyDecision] =
policy.decideNextRetry(actionResult, status).map {
case r @ DelayAndRetry(delay) =>
if status.cumulativeDelay + delay >= threshold then GiveUp else r
case GiveUp => GiveUp
}

RetryPolicy.withShow[F](
RetryPolicy.withShow[F, Res](
decideNextRetry,
show"limitRetriesByCumulativeDelay(threshold=$threshold, $policy)"
)

/** Build a dynamic retry policy that chooses the retry policy based on the result of the last attempt
*/
def dynamic[F[_], Res](f: Res => RetryPolicy[F, Res]): RetryPolicy[F, Res] =
def decideNextRetry(actionResult: Res, status: RetryStatus): F[PolicyDecision] =
val policy = f(actionResult)
policy.decideNextRetry(actionResult, status)

RetryPolicy.withShow[F, Res](
decideNextRetry,
show"dynamic(<function>)"
)
end RetryPolicies
100 changes: 58 additions & 42 deletions modules/core/shared/src/main/scala/retry/RetryPolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@ import cats.arrow.FunctionK
import cats.implicits.*
import cats.Show

case class RetryPolicy[F[_]](
decideNextRetry: RetryStatus => F[PolicyDecision]
/** A retry policy that decides, after a given attempt, whether to delay and retry, or to give up.
*
* @param decideNextRetry
* A function that takes
* - the result of the attempt, which might be a successful value, a failed value or an error
* - information about the retries and delays so far and returns a decision about what to do next
*/
case class RetryPolicy[F[_], -Res](
decideNextRetry: (Res, RetryStatus) => F[PolicyDecision]
):
def show: String = toString

def followedBy(rp: RetryPolicy[F])(using F: Apply[F]): RetryPolicy[F] =
RetryPolicy.withShow(
status =>
F.map2(decideNextRetry(status), rp.decideNextRetry(status)) {
def followedBy[R <: Res](rp: RetryPolicy[F, R])(using F: Apply[F]): RetryPolicy[F, R] =
RetryPolicy.withShow[F, R](
(actionResult, status) =>
F.map2(decideNextRetry(actionResult, status), rp.decideNextRetry(actionResult, status)) {
case (GiveUp, pd) => pd
case (pd, _) => pd
},
Expand All @@ -29,10 +36,10 @@ case class RetryPolicy[F[_]](
* choosing the maximum of the two delays when both of the schedules want to delay the next retry. The dual
* of the `meet` operation.
*/
def join(rp: RetryPolicy[F])(using F: Apply[F]): RetryPolicy[F] =
RetryPolicy.withShow[F](
status =>
F.map2(decideNextRetry(status), rp.decideNextRetry(status)) {
def join[R <: Res](rp: RetryPolicy[F, R])(using F: Apply[F]): RetryPolicy[F, R] =
RetryPolicy.withShow[F, R](
(actionResult, status) =>
F.map2(decideNextRetry(actionResult, status), rp.decideNextRetry(actionResult, status)) {
case (DelayAndRetry(a), DelayAndRetry(b)) => DelayAndRetry(a max b)
case _ => GiveUp
},
Expand All @@ -43,10 +50,10 @@ case class RetryPolicy[F[_]](
* choosing the minimum of the two delays when both of the schedules want to delay the next retry. The dual
* of the `join` operation.
*/
def meet(rp: RetryPolicy[F])(using F: Apply[F]): RetryPolicy[F] =
RetryPolicy.withShow[F](
status =>
F.map2(decideNextRetry(status), rp.decideNextRetry(status)) {
def meet[R <: Res](rp: RetryPolicy[F, R])(using F: Apply[F]): RetryPolicy[F, R] =
RetryPolicy.withShow[F, R](
(actionResult, status) =>
F.map2(decideNextRetry(actionResult, status), rp.decideNextRetry(actionResult, status)) {
case (DelayAndRetry(a), DelayAndRetry(b)) => DelayAndRetry(a min b)
case (s @ DelayAndRetry(_), GiveUp) => s
case (GiveUp, s @ DelayAndRetry(_)) => s
Expand All @@ -57,10 +64,10 @@ case class RetryPolicy[F[_]](

def mapDelay(
f: FiniteDuration => FiniteDuration
)(using F: Functor[F]): RetryPolicy[F] =
)(using F: Functor[F]): RetryPolicy[F, Res] =
RetryPolicy.withShow(
status =>
F.map(decideNextRetry(status)) {
(actionResult, status) =>
F.map(decideNextRetry(actionResult, status)) {
case GiveUp => GiveUp
case DelayAndRetry(d) => DelayAndRetry(f(d))
},
Expand All @@ -69,57 +76,66 @@ case class RetryPolicy[F[_]](

def flatMapDelay(
f: FiniteDuration => F[FiniteDuration]
)(using F: Monad[F]): RetryPolicy[F] =
)(using F: Monad[F]): RetryPolicy[F, Res] =
RetryPolicy.withShow(
status =>
F.flatMap(decideNextRetry(status)) {
(actionResult, status) =>
F.flatMap(decideNextRetry(actionResult, status)) {
case GiveUp => F.pure(GiveUp)
case DelayAndRetry(d) => F.map(f(d))(DelayAndRetry(_))
},
show"$show.flatMapDelay(<function>)"
)

def mapK[N[_]](nt: FunctionK[F, N]): RetryPolicy[N] =
def mapK[N[_]](nt: FunctionK[F, N]): RetryPolicy[N, Res] =
RetryPolicy.withShow(
status => nt(decideNextRetry(status)),
(actionResult, status) => nt(decideNextRetry(actionResult, status)),
show"$show.mapK(<FunctionK>)"
)

def contramap[R](f: R => Res): RetryPolicy[F, R] =
RetryPolicy.withShow[F, R](
(actionResult, status) => decideNextRetry(f(actionResult), status),
show"$show.contramap(<function>)"
)
end RetryPolicy

object RetryPolicy:
def lift[F[_]](
f: RetryStatus => PolicyDecision
def lift[F[_], Res](
f: (Res, RetryStatus) => PolicyDecision
)(using
F: Applicative[F]
): RetryPolicy[F] =
RetryPolicy[F](decideNextRetry = retryStatus => F.pure(f(retryStatus)))
): RetryPolicy[F, Res] = RetryPolicy[F, Res](
decideNextRetry = (actionResult, retryStatus) => F.pure(f(actionResult, retryStatus))
)

def withShow[F[_]](
decideNextRetry: RetryStatus => F[PolicyDecision],
def withShow[F[_], Res](
decideNextRetry: (Res, RetryStatus) => F[PolicyDecision],
pretty: => String
): RetryPolicy[F] =
new RetryPolicy[F](decideNextRetry):
): RetryPolicy[F, Res] =
new RetryPolicy[F, Res](decideNextRetry):
override def show: String = pretty
override def toString: String = pretty

def liftWithShow[F[_]: Applicative](
decideNextRetry: RetryStatus => PolicyDecision,
def liftWithShow[F[_], Res](
decideNextRetry: (Res, RetryStatus) => PolicyDecision,
pretty: => String
): RetryPolicy[F] =
withShow(rs => Applicative[F].pure(decideNextRetry(rs)), pretty)
)(using
F: Applicative[F]
): RetryPolicy[F, Res] =
withShow((actionResult, retryStatus) => F.pure(decideNextRetry(actionResult, retryStatus)), pretty)

given [F[_]](using
given [F[_], Res](using
F: Applicative[F]
): BoundedSemilattice[RetryPolicy[F]] =
new BoundedSemilattice[RetryPolicy[F]]:
override def empty: RetryPolicy[F] =
): BoundedSemilattice[RetryPolicy[F, Res]] =
new BoundedSemilattice[RetryPolicy[F, Res]]:
override def empty: RetryPolicy[F, Res] =
RetryPolicies.constantDelay[F](Duration.Zero)

override def combine(
x: RetryPolicy[F],
y: RetryPolicy[F]
): RetryPolicy[F] = x.join(y)
x: RetryPolicy[F, Res],
y: RetryPolicy[F, Res]
): RetryPolicy[F, Res] = x.join(y)

given [F[_]]: Show[RetryPolicy[F]] =
given [F[_], Res]: Show[RetryPolicy[F, Res]] =
Show.show(_.show)
end RetryPolicy
Loading
Loading