Skip to content

Commit

Permalink
minor fixes, spell check on docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil-Lontkowski committed Jan 27, 2025
1 parent 4e01101 commit dc6d93e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 73 deletions.
18 changes: 14 additions & 4 deletions core/src/main/scala/ox/resilience/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ private[resilience] enum CircuitBreakerState:
case Open(since: Long)
case Closed(since: Long)
case HalfOpen(since: Long, semaphore: Semaphore, completedOperations: Int = 0)
def isSameState(other: CircuitBreakerState): Boolean =
(this, other) match
case (Open(sinceOpen), Open(since)) if since == sinceOpen => true
case (HalfOpen(sinceHalfOpen, _, _), HalfOpen(since, _, _)) if sinceHalfOpen == since => true
case (Closed(sinceClosed), Closed(since)) if sinceClosed == since => true
case _ => false
end CircuitBreakerState

private[resilience] enum CircuitBreakerResult:
case Success
Expand Down Expand Up @@ -53,11 +60,11 @@ end CircuitBreakerStateMachineConfig
/** Circuit Breaker. Operations can be dropped, when the breaker is open or if it doesn't take more operation in halfOpen state. The Circuit
* Breaker might calculate different metrics based on [[SlidingWindow]] provided in config. See [[SlidingWindow]] for more details.
*/
case class CircuitBreaker(config: CircuitBreakerConfig)(using Ox):
case class CircuitBreaker(config: CircuitBreakerConfig)(using ox: Ox, bufferCapacity: BufferCapacity):
private[resilience] val stateMachine = CircuitBreakerStateMachine(config)
private val actorRef: ActorRef[CircuitBreakerStateMachine] = Actor.create(stateMachine)(using sc = BufferCapacity.apply(100))
private val actorRef: ActorRef[CircuitBreakerStateMachine] = Actor.create(stateMachine)

private def tryAcquire: AcquireResult = stateMachine.state match
private def tryAcquire(): AcquireResult = stateMachine.state match
case currState @ CircuitBreakerState.Closed(_) => AcquireResult(true, currState)
case currState @ CircuitBreakerState.Open(_) => AcquireResult(false, currState)
case currState @ CircuitBreakerState.HalfOpen(_, semaphore, _) => AcquireResult(semaphore.tryAcquire(1), currState)
Expand All @@ -73,7 +80,7 @@ case class CircuitBreaker(config: CircuitBreakerConfig)(using Ox):
def runOrDropWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(
operation: => F[T]
): Option[F[T]] =
val acquiredResult = tryAcquire
val acquiredResult = tryAcquire()
if acquiredResult.acquired then
val (duration, result) = timed(operation)
if em.isError(result) then
Expand Down Expand Up @@ -116,3 +123,6 @@ case class CircuitBreaker(config: CircuitBreakerConfig)(using Ox):
def runOrDrop[T](operation: => T): Option[T] =
runOrDropEither(Try(operation).toEither).map(_.fold(throw _, identity))
end CircuitBreaker

object CircuitBreaker:
given default: BufferCapacity = BufferCapacity.apply(100)
74 changes: 20 additions & 54 deletions core/src/main/scala/ox/resilience/CircuitBreakerStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@ private[resilience] case class CircuitBreakerStateMachine(
def state: CircuitBreakerState = _state

def registerResult(result: CircuitBreakerResult, acquired: AcquireResult, selfRef: ActorRef[CircuitBreakerStateMachine]): Unit =
// We check that result was acquired in the same state that we are currently in
val isResultFromCurrentState = (acquired.circuitState, _state) match
case (CircuitBreakerState.Open(sinceOpen), CircuitBreakerState.Open(since)) if since == sinceOpen => true
case (CircuitBreakerState.HalfOpen(sinceHalfOpen, _, _), CircuitBreakerState.HalfOpen(since, _, _)) if sinceHalfOpen == since => true
case (CircuitBreakerState.Closed(sinceClosed), CircuitBreakerState.Closed(since)) if sinceClosed == since => true
case _ => false
// If acquired in different state we don't update results
if isResultFromCurrentState then results.updateResults(result)
updateState(selfRef, Some(acquired))
if acquired.circuitState.isSameState(_state) then
results.updateResults(result)
updateState(selfRef, Some(acquired))
end registerResult

def updateState(selfRef: ActorRef[CircuitBreakerStateMachine], acquiredResult: Option[AcquireResult] = None): Unit =
Expand Down Expand Up @@ -76,27 +71,24 @@ private[resilience] object CircuitBreakerStateMachine:

def nextState(metrics: Metrics, currentState: CircuitBreakerState, config: CircuitBreakerStateMachineConfig): CircuitBreakerState =
val currentTimestamp = metrics.timestamp
// We want to know if last result should be added to completed calls in halfOpen state
val lastCompletedCall = metrics.lastAcquisitionResult match
case Some(AcquireResult(true, CircuitBreakerState.HalfOpen(s, sem, completed))) => 1
case _ => 0
val exceededThreshold = (metrics.failureRate >= config.failureRateThreshold || metrics.slowCallsRate >= config.slowCallThreshold)
val minCallsRecorder = metrics.operationsInWindow >= config.minimumNumberOfCalls
currentState match
case CircuitBreakerState.Closed(since) =>
case self @ CircuitBreakerState.Closed(since) =>
if minCallsRecorder && exceededThreshold then
if config.waitDurationOpenState.toMillis == 0 then
CircuitBreakerState.HalfOpen(currentTimestamp, Semaphore(config.numberOfCallsInHalfOpenState))
else CircuitBreakerState.Open(currentTimestamp)
else CircuitBreakerState.Closed(since)
case CircuitBreakerState.Open(since) =>
else self
case self @ CircuitBreakerState.Open(since) =>
val timePassed = (currentTimestamp - since) >= config.waitDurationOpenState.toMillis
if timePassed || config.waitDurationOpenState.toMillis == 0 then
CircuitBreakerState.HalfOpen(currentTimestamp, Semaphore(config.numberOfCallsInHalfOpenState))
else CircuitBreakerState.Open(since)
if timePassed then CircuitBreakerState.HalfOpen(currentTimestamp, Semaphore(config.numberOfCallsInHalfOpenState))
else self
case CircuitBreakerState.HalfOpen(since, semaphore, completedCalls) =>
lazy val allCallsInHalfOpenCompleted = (completedCalls + lastCompletedCall) >= config.numberOfCallsInHalfOpenState
lazy val timePassed = (currentTimestamp - since) >= config.halfOpenTimeoutDuration.toMillis
// We want to know if last result should be added to completed calls in halfOpen state
val lastCompletedCall = if metrics.lastAcquisitionResult.isDefined then 1 else 0
val allCallsInHalfOpenCompleted = (completedCalls + lastCompletedCall) >= config.numberOfCallsInHalfOpenState
val timePassed = (currentTimestamp - since) >= config.halfOpenTimeoutDuration.toMillis
// if we didn't complete all half open calls but timeout is reached go back to open
if !allCallsInHalfOpenCompleted && config.halfOpenTimeoutDuration.toMillis != 0 && timePassed then
CircuitBreakerState.Open(currentTimestamp)
Expand All @@ -105,12 +97,7 @@ private[resilience] object CircuitBreakerStateMachine:
// If halfOpen calls completed, but rates are still above go back to open
else if allCallsInHalfOpenCompleted && exceededThreshold then CircuitBreakerState.Open(currentTimestamp)
// We didn't complete all half open calls, keep halfOpen
else
metrics.lastAcquisitionResult match
case Some(AcquireResult(true, CircuitBreakerState.HalfOpen(s, _, _)))
if s == since => // Check if this is the same HalfOpen state
CircuitBreakerState.HalfOpen(since, semaphore, completedCalls + 1)
case _ => CircuitBreakerState.HalfOpen(since, semaphore, completedCalls)
else CircuitBreakerState.HalfOpen(since, semaphore, completedCalls + lastCompletedCall)
end if
end match
end nextState
Expand All @@ -124,30 +111,19 @@ private[resilience] sealed trait CircuitBreakerResults(using val ox: Ox):

private[resilience] object CircuitBreakerResults:
case class CountBased(windowSize: Int)(using ox: Ox) extends CircuitBreakerResults(using ox):
private val results = new collection.mutable.ArrayDeque[CircuitBreakerResult](windowSize)
private val results = new collection.mutable.ArrayDeque[CircuitBreakerResult](windowSize + 1)
private var slowCalls = 0
private var failedCalls = 0
private var successCalls = 0

private def clearResults: Unit =
private def clearResults(): Unit =
results.clear()
slowCalls = 0
failedCalls = 0
successCalls = 0

def onStateChange(oldState: CircuitBreakerState, newState: CircuitBreakerState): Unit =
import CircuitBreakerState.*
// we have to match so we don't reset result when for example incrementing completed calls in halfopen state
(oldState, newState) match
case (Closed(_), Open(_) | HalfOpen(_, _, _)) =>
clearResults
case (HalfOpen(_, _, _), Open(_) | Closed(_)) =>
clearResults
case (Open(_), Closed(_) | HalfOpen(_, _, _)) =>
clearResults
case (_, _) => ()
end match
end onStateChange
if !oldState.isSameState(newState) then clearResults()

def updateResults(result: CircuitBreakerResult): Unit =
result match
Expand Down Expand Up @@ -178,7 +154,7 @@ private[resilience] object CircuitBreakerResults:
end CountBased

case class TimeWindowBased(windowDuration: FiniteDuration)(using ox: Ox) extends CircuitBreakerResults(using ox):
// holds timestamp of recored operation and result
// holds timestamp of recorded operation and result
private val results = collection.mutable.ArrayDeque[(Long, CircuitBreakerResult)]()
private var slowCalls = 0
private var failedCalls = 0
Expand All @@ -191,7 +167,7 @@ private[resilience] object CircuitBreakerResults:
successCalls = 0

def calculateMetrics(lastAcquisitionResult: Option[AcquireResult], timestamp: Long): Metrics =
// filter all entries that happend outside sliding window
// filter all entries that happened outside sliding window
val removed = results.removeHeadWhile((time, _) => timestamp > time + windowDuration.toMillis)
removed.foreach { (_, result) =>
result match
Expand Down Expand Up @@ -219,17 +195,7 @@ private[resilience] object CircuitBreakerResults:
results.addOne((System.currentTimeMillis(), result))

def onStateChange(oldState: CircuitBreakerState, newState: CircuitBreakerState): Unit =
import CircuitBreakerState.*
// we have to match so we don't reset result when for example incrementing completed calls in halfopen state
(oldState, newState) match
case (Closed(_), Open(_) | HalfOpen(_, _, _)) =>
clearResults()
case (HalfOpen(_, _, _), Open(_) | Closed(_)) =>
clearResults()
case (Open(_), Closed(_) | HalfOpen(_, _, _)) =>
clearResults()
case (_, _) => ()
end match
end onStateChange
if !oldState.isSameState(newState) then clearResults()

end TimeWindowBased
end CircuitBreakerResults
34 changes: 19 additions & 15 deletions doc/utils/circuit-breaker.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
# Circuit Breaker

The circuit breaker allows controlling execution of operations and stops if certain condition are met. CircuitBreaker is thread-safe and uses [actor](./actors.md) underneath to change breaker state.

```{note}
Since actor executes on one thread this may be bottleneck. That means that calculating state change can be deleyad and breaker can let few more operations to complete before openning.
This can be the case with many very fast operations.
```
The circuit breaker allows controlling execution of operations and stops if certain condition are met. CircuitBreaker is thread-safe and can be used in concurrent scenarios.

## API

Expand All @@ -24,6 +19,9 @@ supervised:

## Configuration

Many config parameters relate to calculated metrics. Those metrics are percentage of calls that failed and percentage of calls that exceeded `slowCallDurationThreshold`.
Which calls are included during calculation of these metrics are determined by `SlidingWindow` configuration.

### Sliding window

There are two ways that metrics are calculated.
Expand All @@ -34,19 +32,19 @@ There are two ways that metrics are calculated.
### Failure rate and slow call rate thresholds

The state of the CircuitBreaker changes from `Closed` to `Open` when the `failureRate` is greater or equal to configurable threshold. For example when 80% of recorded call results failed.
Failures are counted based on provided `ErrorMode`.
Failures are counted based on provided `ErrorMode`. For example any exception that is thrown by the operation, when using the direct, "unwrapped" API or any `Left` variant when using `runOrDropEither`.

The same state change also happen when percentage of slow calls (exceeding `slowCallDurationThreshold`) is equal or greater than configured threshold. For examaple 80% of calls took longer then 10 seconds.
The same state change also happen when percentage of slow calls (exceeding configurable `slowCallDurationThreshold`) is equal or greater than configured threshold. For example 80% of calls took longer then 10 seconds.

Those metrics are considered only when number of recorder calls is greater or equal to `minimumNumberOfCalls`, otherwise we don't change state even if `failureRate` is 100%.

### Parameters

- `failureRateThreshold: PercentageThreshold` - percentage of recorder calls marked as failed required to switch to open state.
- `failureRateThreshold: PercentageThreshold` - percentage of recorded calls marked as failed required to switch to open state.
- `slowCallThreshold: PercentageThreshold` - percentage of recorder calls marked as slow required to switch to open state.
- `slowCallDurationThreshold: FiniteDuration` - duration that call has to exceed to be marked as slow.
- `slidingWindow: SlidingWindow` - mechanism to determine how calls are recorded.
- `minimumNumberOfCalls: Int` - minium number of calls recorded needed for breaker to be able to swtich to open state based on thresholds.
- `minimumNumberOfCalls: Int` - minimum number of calls recorded needed for breaker to be able to switch to open state based on thresholds.
- `waitDurationOpenState: FiniteDuration` - duration that CircuitBreaker will wait before switching from `Open` state to `HalfOpen`.
- `halfOpenTimeoutDuration: FiniteDuration` - timeout for `HalfOpen` state after which, if not enough calls were recorder, breaker will go back to `Open` state. Zero means there is no timeout.
- `numberOfCallsInHalfOpenState: Int` - number of calls recorded in `HalfOpen` state needed to calculate metrics to decide if breaker should go back to `Open` state or `Closed`. It is also maximum number of operations that can be started in this state.
Expand Down Expand Up @@ -79,6 +77,12 @@ numberOfCallsInHalfOpenState = 10
4. State changes from `HalfOpen` to `Open` if `halfOpenTimeoutDuration` passes without enough calls recorded or number of recorder calls is equal to `numberOfCallsInHalfOpenState` and any threshold was exceeded.
5. State changes from `HalfOpen` to `Closed` if `numberOfCallsInHalfOpenState` where completed before timeout and there wasn't any threshold exceeded.


```{note}
CircuitBreaker uses actor internally and since actor executes on one thread this may be bottleneck. That means that calculating state change can be deleyad and breaker can let few more operations to complete before openning.
This can be the case with many very fast operations.
```

## Examples

```scala mdoc:compile-only
Expand All @@ -92,18 +96,18 @@ def eitherOperation: Either[String, Int] = ???
def unionOperation: String | Int = ???

supervised:
val ciruictBreaker = CircuitBreaker(CircuitBreakerConfig.default)
val circuitBreaker = CircuitBreaker(CircuitBreakerConfig.default)

// various operation definitions
ciruictBreaker.runOrDrop(directOperation)
ciruictBreaker.runOrDropEither(eitherOperation)
circuitBreaker.runOrDrop(directOperation)
circuitBreaker.runOrDropEither(eitherOperation)

// custom error mode
ciruictBreaker.runOrDropWithErrorMode(UnionMode[String])(unionOperation)
circuitBreaker.runOrDropWithErrorMode(UnionMode[String])(unionOperation)

// retry with circuit breaker inside
retryEither(RetryConfig.backoff(3, 100.millis)){
ciruictBreaker.runOrDrop(directOperation) match
circuitBreaker.runOrDrop(directOperation) match
case Some(value) => Right(value)
case None => Left("Operation dropped")
}
Expand Down

0 comments on commit dc6d93e

Please sign in to comment.