-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
124 additions
and
111 deletions.
There are no files selected for viewing
135 changes: 37 additions & 98 deletions
135
commons/src/test/scala/io/github/tassiLuca/CancellationTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,118 +1,57 @@ | ||
package io.github.tassiLuca | ||
|
||
import gears.async.AsyncOperations.sleep | ||
import gears.async.default.given | ||
import gears.async.{Async, Future} | ||
import gears.async.{Async, AsyncOperations, Future} | ||
import org.scalatest.funspec.AnyFunSpec | ||
import org.scalatest.matchers.should.Matchers | ||
import scala.language.postfixOps | ||
|
||
import java.util.concurrent.CancellationException | ||
import scala.util.Failure | ||
|
||
class CancellationTest extends AnyFunSpec with Matchers { | ||
|
||
describe("Structured concurrency") { | ||
it("ensure all nested computations are contained within the lifetime of the enclosing one") { | ||
describe("Cancellation of a `Future`") { | ||
it("can be achieve calling `cancel` method on it") { | ||
Async.blocking: | ||
val before = System.currentTimeMillis() | ||
var terminated = false | ||
val f = Future: | ||
val f1 = Future { "hello" } | ||
val f2 = Future { sleep(2_000); "gears!" } | ||
f1.await + " " + f2.await | ||
f.await shouldBe "hello gears!" | ||
val now = System.currentTimeMillis() | ||
now - before should be > 2_000L | ||
AsyncOperations.sleep(5_000) | ||
terminated = true | ||
AsyncOperations.sleep(1_000) | ||
f.cancel() | ||
val result = f.awaitResult | ||
result.isFailure shouldBe true | ||
intercept[CancellationException](result.get) | ||
terminated shouldBe false | ||
} | ||
} | ||
|
||
describe("in case of failures") { | ||
it("if the first nested computation we wait fails with an exception the other is cancelled") { | ||
Async.blocking: | ||
var stillAlive = false | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
val f1 = Future { throw Error(); "hello" } | ||
val f2 = Future { sleep(2_000); stillAlive = true } | ||
f1.await + " " + f2.await // fortunate case in which the one which fails is the one we wait for | ||
f.awaitResult.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
now - before should be < 2_000L | ||
sleep(3_000) | ||
stillAlive shouldBe false | ||
} | ||
|
||
it("if a nested computation fails while we are waiting for another, the enclosing future is not cancelled") { | ||
Async.blocking: | ||
var stillAlive = false | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
val f1 = Future { throw Error(); "gears!" } | ||
val f2 = Future { sleep(2_000); stillAlive = true; "hello" } | ||
f2.await + " " + f1.await // note the inverted order w.r.t. the previous case | ||
f.awaitResult.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
now - before should be > 2_000L | ||
stillAlive shouldBe true | ||
} | ||
|
||
it("but we can achieve cancellation using zip combinator") { | ||
Async.blocking: | ||
var stillAlive = false | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
val f1 = Future { throw Error(); "gears!" } | ||
val f2 = Future { sleep(2_000); stillAlive = true; "hello" } | ||
f2.zip(f1).await | ||
val result = f.awaitResult | ||
result.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
now - before should be < 2_000L | ||
sleep(3_000) | ||
stillAlive shouldBe false | ||
} | ||
describe("Cancellation of an Async context") { | ||
it("can be achieve calling the `cancel` method on the current group") { | ||
var terminated = false | ||
Async.blocking: | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
AsyncOperations.sleep(5_000) | ||
terminated = true | ||
AsyncOperations.sleep(1_000) | ||
Async.current.group.cancel() | ||
Async.current.group.isCancelled shouldBe true | ||
f.awaitResult.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
(now - before) should (be > 1_000L and be < 5_000L) | ||
terminated shouldBe false | ||
} | ||
|
||
it("allows racing futures cancelling the slower one when one succeeds") { | ||
it("once the context is cancelled no future is completed") { | ||
Async.blocking: | ||
var stillAlive = false | ||
Async.current.group.cancel() | ||
val before = System.currentTimeMillis() | ||
val f1 = Future { sleep(1_000); "faster won" } | ||
val f2 = Future { sleep(2_000); stillAlive = true } | ||
val result = f1.altWithCancel(f2).awaitResult | ||
val f = Future: | ||
AsyncOperations.sleep(5_000) | ||
f.awaitResult.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
now - before should (be > 1_000L and be < 5_000L) | ||
result.isSuccess shouldBe true | ||
result.get shouldBe "faster won" | ||
sleep(3_000) | ||
stillAlive shouldBe false | ||
(now - before) should be < 5_000L | ||
} | ||
} | ||
} | ||
|
||
// object TestCancellation3: | ||
// | ||
// class Producer3(using Async): | ||
// val channel = UnboundedChannel[Int]() | ||
// | ||
// def run(): Future[Unit] = Task { | ||
// channel.send(Random.nextInt()) | ||
// }.schedule(Every(1_000)).run | ||
// | ||
// def cancel(): Unit = Async.current.group.cancel() | ||
// | ||
// @main def testCancellation(): Unit = | ||
// Async.blocking: | ||
// val p = Producer3() | ||
// val f1 = p.run() | ||
// val f2 = Task { | ||
// println(s"${p.channel.read()}!") | ||
// }.schedule(Every(1_000)).run | ||
// Thread.sleep(10_000) | ||
// p.cancel() | ||
// p.run().awaitResult | ||
// | ||
// def produceOn(channel: SendableChannel[Terminable[Item]]): Task[Unit] = | ||
// var i = 0 | ||
// Task { | ||
// println(i) | ||
// i = i + 1 | ||
// channel.send(i) | ||
// }.schedule(RepeatUntilFailure(maxRepetitions = itemsProduced)) | ||
// |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
87 changes: 87 additions & 0 deletions
87
commons/src/test/scala/io/github/tassiLuca/StructuredConcurrencyTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package io.github.tassiLuca | ||
|
||
import gears.async.AsyncOperations.sleep | ||
import gears.async.default.given | ||
import gears.async.{Async, Future} | ||
import org.scalatest.funspec.AnyFunSpec | ||
import org.scalatest.matchers.should.Matchers | ||
import scala.language.postfixOps | ||
|
||
class StructuredConcurrencyTest extends AnyFunSpec with Matchers { | ||
|
||
describe("Structured concurrency") { | ||
it("ensure all nested computations are contained within the lifetime of the enclosing one") { | ||
Async.blocking: | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
val f1 = Future { "hello" } | ||
val f2 = Future { sleep(2_000); "gears!" } | ||
f1.await + " " + f2.await | ||
f.await shouldBe "hello gears!" | ||
val now = System.currentTimeMillis() | ||
now - before should be > 2_000L | ||
} | ||
|
||
describe("in case of failures") { | ||
it("if the first nested computation we wait fails with an exception the other is cancelled") { | ||
Async.blocking: | ||
var stillAlive = false | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
val f1 = Future { throw Error(); "hello" } | ||
val f2 = Future { sleep(2_000); stillAlive = true } | ||
f1.await + " " + f2.await // fortunate case in which the one which fails is the one we wait for | ||
f.awaitResult.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
now - before should be < 2_000L | ||
sleep(3_000) | ||
stillAlive shouldBe false | ||
} | ||
|
||
it("if a nested computation fails while we are waiting for another, the enclosing future is not cancelled") { | ||
Async.blocking: | ||
var stillAlive = false | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
val f1 = Future { throw Error(); "gears!" } | ||
val f2 = Future { sleep(2_000); stillAlive = true; "hello" } | ||
f2.await + " " + f1.await // note the inverted order w.r.t. the previous case | ||
f.awaitResult.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
now - before should be > 2_000L | ||
stillAlive shouldBe true | ||
} | ||
|
||
it("but we can achieve cancellation using zip combinator") { | ||
Async.blocking: | ||
var stillAlive = false | ||
val before = System.currentTimeMillis() | ||
val f = Future: | ||
val f1 = Future { throw Error(); "gears!" } | ||
val f2 = Future { sleep(2_000); stillAlive = true; "hello" } | ||
f2.zip(f1).await | ||
val result = f.awaitResult | ||
result.isFailure shouldBe true | ||
val now = System.currentTimeMillis() | ||
now - before should be < 2_000L | ||
sleep(3_000) | ||
stillAlive shouldBe false | ||
} | ||
} | ||
|
||
it("allows racing futures cancelling the slower one when one succeeds") { | ||
Async.blocking: | ||
var stillAlive = false | ||
val before = System.currentTimeMillis() | ||
val f1 = Future { sleep(1_000); "faster won" } | ||
val f2 = Future { sleep(2_000); stillAlive = true } | ||
val result = f1.altWithCancel(f2).awaitResult | ||
val now = System.currentTimeMillis() | ||
now - before should (be > 1_000L and be < 5_000L) | ||
result.isSuccess shouldBe true | ||
result.get shouldBe "faster won" | ||
sleep(3_000) | ||
stillAlive shouldBe false | ||
} | ||
} | ||
} |