Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-git committed Feb 1, 2024
1 parent 4e1e68e commit 2e764df
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 23 deletions.
16 changes: 8 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import com.typesafe.tools.mima.core._

ThisBuild / tlBaseVersion := "0.5" // your current series x.y
ThisBuild / tlBaseVersion := "0.6" // your current series x.y

ThisBuild / organization := "io.chrisdavenport"
ThisBuild / organizationName := "Christopher Davenport"
Expand All @@ -17,13 +17,13 @@ ThisBuild / tlSonatypeUseLegacyHost := true

ThisBuild / testFrameworks += new TestFramework("munit.Framework")

val catsV = "2.9.0"
val catsEffectV = "3.4.8"
val catsV = "2.10.0"
val catsEffectV = "3.5.3"
val scalaTestV = "3.2.9"

val scala213 = "2.13.8"
val scala213 = "2.13.12"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := Seq("2.12.14", scala213, "3.2.2")
ThisBuild / crossScalaVersions := Seq("2.12.18", scala213, "3.3.1")

ThisBuild / licenses := Seq("Apache-2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0.html"))

Expand All @@ -37,9 +37,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
.settings(
name := "circuit",
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % catsV,
"org.typelevel" %%% "cats-effect" % catsEffectV,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
"org.typelevel" %%% "cats-core" % catsV,
"org.typelevel" %%% "cats-effect" % catsEffectV,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M4" % Test,
),
mimaBinaryIssueFilters := Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.circuit.CircuitBreaker#SyncCircuitBreaker.this")
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/scala/io/chrisdavenport/circuit/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* 1. Change Package
* 2. Change Documentation for new packages
* 3. Fix Linking Error for ScalaDoc
* 4. Do not allow closing, if expiration duration has not passed
*/
package io.chrisdavenport.circuit

Expand Down Expand Up @@ -723,11 +724,14 @@ object CircuitBreaker {
def openOnFail[A](f: F[A], poll: Poll[F]): F[A] = {
poll(f).guaranteeCase {
case Outcome.Succeeded(_) =>
ref.modify{
case Closed(_) => (ClosedZero, F.unit)
case HalfOpen => (ClosedZero, onClosed.attempt.void)
case Open(_,_) => (ClosedZero, onClosed.attempt.void)
}.flatten
Temporal[F].realTime.map(_.toMillis).flatMap { now =>
ref.modify {
case Closed(_) => (ClosedZero, F.unit)
case HalfOpen => (ClosedZero, onClosed.attempt.void)
case o: Open if o.expiresAt >= now => (o, F.unit)
case Open(_, _) => (ClosedZero, onClosed.attempt.void)
}.flatten
}
case Outcome.Errored(e) =>
Temporal[F].realTime.map(_.toMillis).flatMap { now =>
ref.modify {
Expand Down Expand Up @@ -792,8 +796,8 @@ object CircuitBreaker {
def protect[A](fa: F[A]): F[A] = {
Concurrent[F].uncancelable{poll =>
ref.get.flatMap {
case _: Closed => openOnFail(fa, poll)
case open: Open => tryReset(open, fa, poll)
case _: Closed => openOnFail(fa, poll)
case open: Open => tryReset(open, fa, poll)
case HalfOpen => onRejected.attempt >> poll(F.raiseError[A](RejectedExecution(HalfOpen)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
package io.chrisdavenport.circuit

import cats.syntax.all._

import scala.concurrent.duration._
import cats.effect._
// import cats.effect.syntax._

// import catalysts.Platform
import munit.CatsEffectSuite

class CircuitBreakerTests extends CatsEffectSuite {
Expand Down Expand Up @@ -304,12 +302,13 @@ class CircuitBreakerTests extends CatsEffectSuite {
}

test("Validate onClosed is called when closing from longRunning openOnFail"){
val resetDuration = 100.milliseconds
val test = for {
cb1 <- CircuitBreaker.of[IO](
maxFailures = 1,
resetTimeout = 1.minute,
backoff = Backoff.constant(1.minute),
maxResetTimeout = 1.minute
resetTimeout = resetDuration,
backoff = Backoff.constant(resetDuration),
maxResetTimeout = resetDuration
)
opened <- Ref[IO].of(false)
closed <- Ref[IO].of(false)
Expand All @@ -321,9 +320,10 @@ class CircuitBreakerTests extends CatsEffectSuite {
completed <- Deferred[IO, Unit]
_ <- (started.complete(()) >> cb.protect(wait.get) >> completed.complete(())).start // Will reset when wait completes
_ <- started.get
_ <- IO.sleep(100.millis)
_ <- IO.sleep(10.millis)
_ <- taskInError.attempt
_ <- opened.get.map(assertEquals(_, true))
_ <- IO.sleep(resetDuration)
_ <- wait.complete(())
_ <- completed.get
didClose <- closed.get
Expand All @@ -332,6 +332,104 @@ class CircuitBreakerTests extends CatsEffectSuite {
test
}

test("Validate behaviour for one slower than expiration call followed by failing fast calls"){
val resetTimeout = 100.milliseconds
val slowDuration = resetTimeout + 50.milliseconds
val test = for {
cb1 <- CircuitBreaker.of[IO](
maxFailures = 1,
resetTimeout = resetTimeout,
backoff = Backoff.constant(resetTimeout),
maxResetTimeout = resetTimeout
)
opened <- Ref[IO].of(false)
halfOpened <- Ref[IO].of(false)
closed <- Ref[IO].of(false)
cb = cb1.doOnOpen(opened.set(true)).doOnClosed(closed.set(true)).doOnHalfOpen(halfOpened.set(true))
dummy = new RuntimeException("dummy")
taskInError = cb.protect(IO[Int](throw dummy))
taskSlowSucceeds = cb.protect(IO.sleep(slowDuration))
_ <- taskSlowSucceeds.start
_ <- cb.state.map {
case _: CircuitBreaker.Closed => assert(true)
case _ => assert(false)
}
_ <- taskInError.attempt
_ <- cb.state.map {
case _: CircuitBreaker.Open => assert(true)
case _ => assert(false)
}
_ <- taskSlowSucceeds.attempt.map{
case Left(_: CircuitBreaker.RejectedExecution) => assert(true)
case _ => assert(false)
}
_ <- IO.sleep(slowDuration + 10.milliseconds) // `taskSlowSucceeds` finishes after expiration and closes `cb`
_ <- taskSlowSucceeds.attempt
_ <- cb.state.map {
case _: CircuitBreaker.Closed => assert(true)
case x => println(x); assert(false)
}
} yield ()

test
}

test("Validate behaviour for one slow call followed by fast calls"){
val resetTimeout = 100.milliseconds
val slowDuration = 50.milliseconds
val test = for {
cb1 <- CircuitBreaker.of[IO](
maxFailures = 1,
resetTimeout = resetTimeout,
backoff = Backoff.constant(resetTimeout),
maxResetTimeout = resetTimeout
)
opened <- Ref[IO].of(false)
halfOpened <- Ref[IO].of(false)
closed <- Ref[IO].of(false)
cb = cb1.doOnOpen(opened.set(true)).doOnClosed(closed.set(true)).doOnHalfOpen(halfOpened.set(true))
dummy = new RuntimeException("dummy")
taskInError = cb.protect(IO[Int](throw dummy))
taskSlowSucceeds = cb.protect(IO.sleep(slowDuration))
_ <- taskSlowSucceeds.start
_ <- cb.state.map {
case _: CircuitBreaker.Closed => assert(true)
case _ => assert(false)
}
_ <- taskInError.attempt
_ <- cb.state.map {
case _: CircuitBreaker.Open => assert(true)
case _ => assert(false)
}
_ <- taskSlowSucceeds.attempt.map{
case Left(_) => assert(true)
case _ => assert(false)
}
_ <- IO.sleep(slowDuration) // `taskSlowSucceeds` finishes before expiration and leaved `cb` open
_ <- cb.state.map {
case _: CircuitBreaker.Open => assert(true)
case _ => assert(false)
}
_ <- IO.sleep(resetTimeout) // next call will half-open `cb`
_ <- IO.racePair(taskSlowSucceeds, taskSlowSucceeds).map {
case Left((Outcome.Errored(_: CircuitBreaker.RejectedExecution), _)) => assert(true)
case Right((_, Outcome.Errored(_: CircuitBreaker.RejectedExecution))) => assert(true)
case _ => assert(false)
}
_ <- cb.state.map {
case CircuitBreaker.HalfOpen => assert(true)
case _ => assert(false)
}
_ <- IO.sleep(resetTimeout)
_ <- cb.state.map {
case _: CircuitBreaker.Closed => assert(true)
case _ => assert(false)
}
} yield ()

test
}

test("should only count allowed exceptions") {
case class MyException(foo: String) extends Throwable

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.8.2
sbt.version=1.9.8

0 comments on commit 2e764df

Please sign in to comment.