Skip to content

Commit

Permalink
Schedule based on cron expressions (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil-Lontkowski authored Jan 3, 2025
1 parent 5769869 commit 6368555
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 3 deletions.
16 changes: 14 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ compileDocumentation := {
lazy val rootProject = (project in file("."))
.settings(commonSettings)
.settings(publishArtifact := false, name := "ox")
.aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams)
.aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams, cron)

lazy val core: Project = (project in file("core"))
.settings(commonSettings)
Expand Down Expand Up @@ -94,6 +94,17 @@ lazy val flowReactiveStreams: Project = (project in file("flow-reactive-streams"
)
.dependsOn(core)

lazy val cron: Project = (project in file("cron"))
.settings(commonSettings)
.settings(
name := "cron",
libraryDependencies ++= Seq(
"com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.7.0",
scalaTest
)
)
.dependsOn(core % "test->test;compile->compile")

lazy val documentation: Project = (project in file("generated-doc")) // important: it must not be doc/
.enablePlugins(MdocPlugin)
.settings(commonSettings)
Expand All @@ -113,5 +124,6 @@ lazy val documentation: Project = (project in file("generated-doc")) // importan
core,
kafka,
mdcLogback,
flowReactiveStreams
flowReactiveStreams,
cron
)
11 changes: 11 additions & 0 deletions core/src/main/scala/ox/scheduling/Schedule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ object Schedule:

private[scheduling] sealed trait Infinite extends Schedule

/** @param computeNextDuration
* computes time between next invocations of operation. Invocation = 0 represents initialDelay before invoking operation for the first
* time.
*/
private[scheduling] final case class ComputedInfinite(
computeNextDuration: (Int, Option[FiniteDuration]) => FiniteDuration
) extends Infinite:
def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = computeNextDuration(invocation, lastDuration)

override def initialDelay: FiniteDuration = computeNextDuration(0, None)

/** A schedule that represents an initial delay applied before the first invocation of operation being scheduled. Usually used in
* combination with other schedules using [[andThen]]
*
Expand Down
40 changes: 40 additions & 0 deletions cron/src/main/scala/ox/scheduling/cron/CronSchedule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ox.scheduling.cron

import cron4s.lib.javatime.*
import cron4s.{Cron, CronExpr, toDateTimeCronOps}
import ox.scheduling.Schedule

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, FiniteDuration}

/** Methods in this object provide [[Schedule]] based on supplied cron expression.
*/
object CronSchedule:
/** @param expression
* cron expression to parse
* @return
* [[CronSchedule]] from cron expression
* @throws cron4s.Error
* in case of invalid expression
*/
def unsafeFromString(expression: String): Schedule =
fromCronExpr(Cron.unsafeParse(expression))

/** @param cron
* [[CronExpr]] to base [[Schedule]] on.
* @return
* [[Schedule]] from cron expression
*/
def fromCronExpr(cron: CronExpr): Schedule =
def computeNext(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration =
val now = LocalDateTime.now()
val next = cron.next(now)
val duration = next.map(n => ChronoUnit.MILLIS.between(now, n))
duration.map(FiniteDuration.apply(_, TimeUnit.MILLISECONDS)).getOrElse(Duration.Zero)
end computeNext

Schedule.ComputedInfinite(computeNext)
end fromCronExpr
end CronSchedule
49 changes: 49 additions & 0 deletions cron/src/test/scala/ox/scheduling/cron/CronScheduleTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ox.scheduling.cron

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import cron4s.*
import ox.scheduling.{RepeatConfig, repeat}
import scala.concurrent.duration.*
import ox.util.ElapsedTime

class CronScheduleTest extends AnyFlatSpec with Matchers with ElapsedTime:
behavior of "repeat with cron schedule"

it should "repeat a function every second" in {
// given
val cronExpr = Cron.unsafeParse("* * * ? * *") // every second
val cronSchedule = CronSchedule.fromCronExpr(cronExpr)

var counter = 0

def f =
if counter > 0 then throw new RuntimeException("boom")
else counter += 1

// when
val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig(cronSchedule))(f))

// then
ex.getMessage shouldBe "boom"
counter shouldBe 1
elapsedTime.toMillis should be < 2200L // Run 2 times, so at most 2 secs - 200ms for tolerance
}

it should "provide initial delay" in {
// give
val cronExpr = Cron.unsafeParse("* * * ? * *") // every second
val cronSchedule = CronSchedule.fromCronExpr(cronExpr)

def f =
throw new RuntimeException("boom")

// when
val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig(cronSchedule))(f))

// then
ex.getMessage shouldBe "boom"
elapsedTime.toMillis should be < 1200L // Run 1 time, so at most 1 sec - 200ms for tolerance
elapsedTime.toMillis should be > 0L
}
end CronScheduleTest
69 changes: 69 additions & 0 deletions doc/integrations/cron4s.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Cron scheduler

Dependency:

```scala
"com.softwaremill.ox" %% "cron" % "@VERSION@"
```

This module allows to run schedules based on cron expressions from [cron4s](https://github.com/alonsodomin/cron4s).

`CronSchedule` can be used in all places that requires `Schedule` especially in repeat scenarios.

For defining `CronExpr` see [cron4s documentation](https://www.alonsodomin.me/cron4s/userguide/index.html).

## Api

The cron module exposes methods for creating `Schedule` based on `CronExpr`.

```scala
import ox.scheduling.cron.*
import cron4s.*

repeat(RepeatConfig(CronSchedule.unsafeFromString("10-35 2,4,6 * ? * *")))(operation)
```

## Operation definition

Methods from `ox.scheduling.cron.CronSchedule` define `Schedule`, so they can be plugged into `RepeatConfig` and used with `repeat` API.


## Configuration

All configuration beyond `CronExpr` is provided by the `repeat` API. If an error handling within the operation
is needed, you can use a `retry` inside it (see an example below) or use `scheduled` with `CronSchedule` instead of `repeat`, which allows
full customization.


## Examples

```scala mdoc:compile-only
import ox.UnionMode
import ox.scheduling.cron.CronSchedule
import scala.concurrent.duration.*
import ox.resilience.{RetryConfig, retry}
import ox.scheduling.*
import cron4s.*

def directOperation: Int = ???
def eitherOperation: Either[String, Int] = ???
def unionOperation: String | Int = ???

val cronExpr: CronExpr = Cron.unsafeParse("10-35 2,4,6 * ? * *")

// various operation definitions - same syntax
repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(directOperation)
repeatEither(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(eitherOperation)

// infinite repeats with a custom strategy
def customStopStrategy: Int => Boolean = ???
repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr), customStopStrategy))(directOperation)

// custom error mode
repeatWithErrorMode(UnionMode[String])(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(unionOperation)

// repeat with retry inside
repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr))) {
retry(RetryConfig.backoff(3, 100.millis))(directOperation)
}
```
2 changes: 1 addition & 1 deletion doc/utils/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Instance with default configuration can be obtained with `AdaptiveRetry.default`

`retry` will attempt to retry an operation if it throws an exception; `retryEither` will additionally retry, if the result is a `Left`. Finally `retryWithErrorMode` is the most flexible, and allows retrying operations using custom failure modes (such as union types).

The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `T` should be considered failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation.
The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `Either[E, T]` should be considered as a failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation.

### Examples

Expand Down

0 comments on commit 6368555

Please sign in to comment.