diff --git a/build.sbt b/build.sbt index a11f862..73268ea 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ import com.typesafe.tools.mima.core._ -ThisBuild / tlBaseVersion := "0.5" // your current series x.y +ThisBuild / tlBaseVersion := "0.6" // your current series x.y ThisBuild / organization := "io.chrisdavenport" ThisBuild / organizationName := "Christopher Davenport" @@ -17,13 +17,13 @@ ThisBuild / tlSonatypeUseLegacyHost := true ThisBuild / testFrameworks += new TestFramework("munit.Framework") -val catsV = "2.9.0" -val catsEffectV = "3.4.8" +val catsV = "2.10.0" +val catsEffectV = "3.5.3" val scalaTestV = "3.2.9" -val scala213 = "2.13.8" +val scala213 = "2.13.12" ThisBuild / scalaVersion := scala213 -ThisBuild / crossScalaVersions := Seq("2.12.14", scala213, "3.2.2") +ThisBuild / crossScalaVersions := Seq("2.12.18", scala213, "3.3.1") ThisBuild / licenses := Seq("Apache-2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0.html")) @@ -37,9 +37,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) .settings( name := "circuit", libraryDependencies ++= Seq( - "org.typelevel" %%% "cats-core" % catsV, - "org.typelevel" %%% "cats-effect" % catsEffectV, - "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, + "org.typelevel" %%% "cats-core" % catsV, + "org.typelevel" %%% "cats-effect" % catsEffectV, + "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M4" % Test, ), mimaBinaryIssueFilters := Seq( ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.circuit.CircuitBreaker#SyncCircuitBreaker.this") diff --git a/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala b/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala index df439f6..1dd166a 100644 --- a/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala +++ b/core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala @@ -22,6 +22,7 @@ * 1. Change Package * 2. Change Documentation for new packages * 3. Fix Linking Error for ScalaDoc + * 4. Do not allow closing, if expiration duration has not passed */ package io.chrisdavenport.circuit @@ -723,11 +724,14 @@ object CircuitBreaker { def openOnFail[A](f: F[A], poll: Poll[F]): F[A] = { poll(f).guaranteeCase { case Outcome.Succeeded(_) => - ref.modify{ - case Closed(_) => (ClosedZero, F.unit) - case HalfOpen => (ClosedZero, onClosed.attempt.void) - case Open(_,_) => (ClosedZero, onClosed.attempt.void) - }.flatten + Temporal[F].realTime.map(_.toMillis).flatMap { now => + ref.modify { + case Closed(_) => (ClosedZero, F.unit) + case HalfOpen => (ClosedZero, onClosed.attempt.void) + case o: Open if o.expiresAt >= now => (o, F.unit) + case Open(_, _) => (ClosedZero, onClosed.attempt.void) + }.flatten + } case Outcome.Errored(e) => Temporal[F].realTime.map(_.toMillis).flatMap { now => ref.modify { @@ -792,8 +796,8 @@ object CircuitBreaker { def protect[A](fa: F[A]): F[A] = { Concurrent[F].uncancelable{poll => ref.get.flatMap { - case _: Closed => openOnFail(fa, poll) - case open: Open => tryReset(open, fa, poll) + case _: Closed => openOnFail(fa, poll) + case open: Open => tryReset(open, fa, poll) case HalfOpen => onRejected.attempt >> poll(F.raiseError[A](RejectedExecution(HalfOpen))) } } diff --git a/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala b/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala index 00a82ea..e67dadf 100644 --- a/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala +++ b/core/src/test/scala/io/chrisdavenport/circuit/CircuitBreakerTests.scala @@ -25,11 +25,9 @@ package io.chrisdavenport.circuit import cats.syntax.all._ + import scala.concurrent.duration._ import cats.effect._ -// import cats.effect.syntax._ - -// import catalysts.Platform import munit.CatsEffectSuite class CircuitBreakerTests extends CatsEffectSuite { @@ -304,12 +302,13 @@ class CircuitBreakerTests extends CatsEffectSuite { } test("Validate onClosed is called when closing from longRunning openOnFail"){ + val resetDuration = 100.milliseconds val test = for { cb1 <- CircuitBreaker.of[IO]( maxFailures = 1, - resetTimeout = 1.minute, - backoff = Backoff.constant(1.minute), - maxResetTimeout = 1.minute + resetTimeout = resetDuration, + backoff = Backoff.constant(resetDuration), + maxResetTimeout = resetDuration ) opened <- Ref[IO].of(false) closed <- Ref[IO].of(false) @@ -321,9 +320,10 @@ class CircuitBreakerTests extends CatsEffectSuite { completed <- Deferred[IO, Unit] _ <- (started.complete(()) >> cb.protect(wait.get) >> completed.complete(())).start // Will reset when wait completes _ <- started.get - _ <- IO.sleep(100.millis) + _ <- IO.sleep(10.millis) _ <- taskInError.attempt _ <- opened.get.map(assertEquals(_, true)) + _ <- IO.sleep(resetDuration) _ <- wait.complete(()) _ <- completed.get didClose <- closed.get @@ -332,6 +332,104 @@ class CircuitBreakerTests extends CatsEffectSuite { test } + test("Validate behaviour for one slower than expiration call followed by failing fast calls"){ + val resetTimeout = 100.milliseconds + val slowDuration = resetTimeout + 50.milliseconds + val test = for { + cb1 <- CircuitBreaker.of[IO]( + maxFailures = 1, + resetTimeout = resetTimeout, + backoff = Backoff.constant(resetTimeout), + maxResetTimeout = resetTimeout + ) + opened <- Ref[IO].of(false) + halfOpened <- Ref[IO].of(false) + closed <- Ref[IO].of(false) + cb = cb1.doOnOpen(opened.set(true)).doOnClosed(closed.set(true)).doOnHalfOpen(halfOpened.set(true)) + dummy = new RuntimeException("dummy") + taskInError = cb.protect(IO[Int](throw dummy)) + taskSlowSucceeds = cb.protect(IO.sleep(slowDuration)) + _ <- taskSlowSucceeds.start + _ <- cb.state.map { + case _: CircuitBreaker.Closed => assert(true) + case _ => assert(false) + } + _ <- taskInError.attempt + _ <- cb.state.map { + case _: CircuitBreaker.Open => assert(true) + case _ => assert(false) + } + _ <- taskSlowSucceeds.attempt.map{ + case Left(_: CircuitBreaker.RejectedExecution) => assert(true) + case _ => assert(false) + } + _ <- IO.sleep(slowDuration + 10.milliseconds) // `taskSlowSucceeds` finishes after expiration and closes `cb` + _ <- taskSlowSucceeds.attempt + _ <- cb.state.map { + case _: CircuitBreaker.Closed => assert(true) + case x => println(x); assert(false) + } + } yield () + + test + } + + test("Validate behaviour for one slow call followed by fast calls"){ + val resetTimeout = 100.milliseconds + val slowDuration = 50.milliseconds + val test = for { + cb1 <- CircuitBreaker.of[IO]( + maxFailures = 1, + resetTimeout = resetTimeout, + backoff = Backoff.constant(resetTimeout), + maxResetTimeout = resetTimeout + ) + opened <- Ref[IO].of(false) + halfOpened <- Ref[IO].of(false) + closed <- Ref[IO].of(false) + cb = cb1.doOnOpen(opened.set(true)).doOnClosed(closed.set(true)).doOnHalfOpen(halfOpened.set(true)) + dummy = new RuntimeException("dummy") + taskInError = cb.protect(IO[Int](throw dummy)) + taskSlowSucceeds = cb.protect(IO.sleep(slowDuration)) + _ <- taskSlowSucceeds.start + _ <- cb.state.map { + case _: CircuitBreaker.Closed => assert(true) + case _ => assert(false) + } + _ <- taskInError.attempt + _ <- cb.state.map { + case _: CircuitBreaker.Open => assert(true) + case _ => assert(false) + } + _ <- taskSlowSucceeds.attempt.map{ + case Left(_) => assert(true) + case _ => assert(false) + } + _ <- IO.sleep(slowDuration) // `taskSlowSucceeds` finishes before expiration and leaved `cb` open + _ <- cb.state.map { + case _: CircuitBreaker.Open => assert(true) + case _ => assert(false) + } + _ <- IO.sleep(resetTimeout) // next call will half-open `cb` + _ <- IO.racePair(taskSlowSucceeds, taskSlowSucceeds).map { + case Left((Outcome.Errored(_: CircuitBreaker.RejectedExecution), _)) => assert(true) + case Right((_, Outcome.Errored(_: CircuitBreaker.RejectedExecution))) => assert(true) + case _ => assert(false) + } + _ <- cb.state.map { + case CircuitBreaker.HalfOpen => assert(true) + case _ => assert(false) + } + _ <- IO.sleep(resetTimeout) + _ <- cb.state.map { + case _: CircuitBreaker.Closed => assert(true) + case _ => assert(false) + } + } yield () + + test + } + test("should only count allowed exceptions") { case class MyException(foo: String) extends Throwable diff --git a/project/build.properties b/project/build.properties index 46e43a9..abbbce5 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.8.2 +sbt.version=1.9.8