Skip to content

Commit

Permalink
test: fix test errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Mar 3, 2024
1 parent 800f2b4 commit 403f07e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import io.github.tassiLuca.pimping.asTry
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import scala.util.{Random, Try}
import scala.util.Random

class ChannelsContextTest extends AnyFunSpec with Matchers {

type Item = Int
val itemsProduced = 10
val items = 10

describe("Consumer") {
it("read no item if the producer is run in another context") {
var i = 0
val channel = BufferedChannel[Item](itemsProduced)
val channel = BufferedChannel[Item](items)
Async.blocking:
channel.consume {
case Left(_) => ()
Expand All @@ -33,38 +33,21 @@ class ChannelsContextTest extends AnyFunSpec with Matchers {
Async.blocking:
val channel = UnboundedChannel[Future[Item]]()
Future:
for _ <- 0 to itemsProduced do channel.send(Future { AsyncOperations.sleep(5_000); 0 })
for _ <- 0 to itemsProduced do
for _ <- 0 until items do channel.send(Future { AsyncOperations.sleep(2_000); 0 })
for _ <- 0 until items do
val result = channel.read().asTry.flatMap(_.awaitResult)
result.isFailure shouldBe true
intercept[CancellationException](result.get)
}

it("but should work putting the send inside a future") {
it("should work spawning futures and await all") {
Async.blocking:
val channel = UnboundedChannel[Future[Item]]()
for _ <- 0 to itemsProduced do
Future { AsyncOperations.sleep(5_000); 0 }
.onComplete(Listener((_, f) => channel.send(f.asInstanceOf[Future[Item]])))
for _ <- 0 to itemsProduced do
val result = channel.read().asTry.flatMap(_.awaitResult)
result.isSuccess shouldBe true
result.get shouldBe 0
}

it("instead of future, their results") {
Async.blocking:
val channel = UnboundedChannel[Try[Item]]()
Future:
var fs: Seq[Future[Item]] = Seq()
for _ <- 0 to itemsProduced do
val f = Future { AsyncOperations.sleep(5_000); 100 }
fs = fs :+ f
f.onComplete(Listener((r, _) => channel.send(r)))
fs.awaitAll
for _ <- 0 to itemsProduced do
val result = channel.read()
println(result)
var fs: Seq[Future[Item]] = Seq()
for _ <- 0 until items do
fs = fs :+ Future:
AsyncOperations.sleep(2_000)
1
fs.awaitAll.sum shouldBe items
}
}

Expand All @@ -83,7 +66,7 @@ class ChannelsContextTest extends AnyFunSpec with Matchers {

def produceOn(channel: SendableChannel[Item]): Task[Unit] = Task {
channel.send(Random.nextInt())
}.schedule(Every(500, maxRepetitions = itemsProduced))
}.schedule(Every(500, maxRepetitions = items))

extension (channel: ReadableChannel[Item])
def consume(action: Either[Closed, Item] => Unit): Task[Unit] = Task {
Expand Down
103 changes: 54 additions & 49 deletions commons/src/test/scala/io/github/tassiLuca/pimping/FlowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,47 @@ class FlowTest extends AnyFunSpec with Matchers:

describe("Flows") {
it("are cold streams") {
var emitted: Seq[Try[Item]] = Seq()
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
val flow = simpleFlow
AsyncOperations.sleep(2_000)
emitted should be(empty)
flow.collect(value => emitted = emitted :+ value)
emitted shouldBe Seq.range(0, items).map(Success(_))
collected should be(empty)
flow.collect(value => collected = collected :+ value)
collected shouldBe Seq.range(0, items).map(Success(_))
}

it("calling collect multiple times should emit the same values") {
var emitted1: Seq[Try[Item]] = Seq()
var emitted2: Seq[Try[Item]] = Seq()
var collected1: Seq[Try[Item]] = Seq()
var collected2: Seq[Try[Item]] = Seq()
Async.blocking:
val flow = simpleFlow
flow.collect(value => emitted1 = emitted1 :+ value)
flow.collect(value => emitted2 = emitted2 :+ value)
emitted1 shouldBe Seq.range(0, items).map(Success(_))
emitted2 shouldBe Seq.range(0, items).map(Success(_))
flow.collect(value => collected1 = collected1 :+ value)
flow.collect(value => collected2 = collected2 :+ value)
collected1 shouldBe Seq.range(0, items).map(Success(_))
collected2 shouldBe Seq.range(0, items).map(Success(_))
}

it("if collected concurrently by multiple Futures should emit the same values as well") {
var collected1: Seq[Try[Item]] = Seq()
var collected2: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted1: Seq[Try[Item]] = Seq()
var emitted2: Seq[Try[Item]] = Seq()
val flow = simpleFlow
val f1 = Future:
flow.collect(value => emitted1 = emitted1 :+ value)
flow.collect(value => collected1 = collected1 :+ value)
val f2 = Future:
flow.collect(value => emitted2 = emitted2 :+ value)
flow.collect(value => collected2 = collected2 :+ value)
(f1 :: f2 :: Nil).awaitAll
emitted1 shouldBe Seq.range(0, items).map(Success(_))
emitted2 shouldBe Seq.range(0, items).map(Success(_))
collected1 shouldBe Seq.range(0, items).map(Success(_))
collected2 shouldBe Seq.range(0, items).map(Success(_))
}

it("when throwing an exception inside the `body` should emit a failure and stop flowing") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
failingFlow.collect(value => collected = collected :+ value)
collected.size shouldBe 1
collected.head.isFailure shouldBe true
intercept[IllegalStateException](collected.head.get)
}

it("should work as well with futures") {
Expand All @@ -72,69 +72,74 @@ class FlowTest extends AnyFunSpec with Matchers:
fs.map(_.await) shouldBe Seq.range(0, items)
}

it("allows to cancel the task execution inside a collect") {
it("allows canceling the task execution inside a collect") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
val longLastingFlow = Flow:
(0 until items).foreach { x =>
AsyncOperations.sleep(1_000); it.emit(x)
}
longLastingFlow.collect(v => if v == Success(3) then Async.current.group.cancel() else println(v))
longLastingFlow.collect { value =>
if value == Success(3) then Async.current.group.cancel()
else collected = collected :+ value
}
collected shouldBe Seq.range(0, 3).map(Success(_))
}

describe("Flows `map`") {
it("should work") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
val f = (x: Item) => x * x
simpleFlow.map(f).collect(value => emitted = emitted :+ value)
emitted shouldBe Seq.range(0, items).map(f).map(Success(_))
simpleFlow.map(f).collect(value => collected = collected :+ value)
collected shouldBe Seq.range(0, items).map(f).map(Success(_))
}

it("should emit a `Failure` if an exception is thrown in the original flow") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.map(_ * 2).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
failingFlow.map(_ * 2).collect(value => collected = collected :+ value)
collected.size shouldBe 1
collected.head.isFailure shouldBe true
intercept[IllegalStateException](collected.head.get)
}

it("should emit a `Failure` if an exception is thrown in the given function") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
val failingFunction = (_: Item) => throw IllegalStateException()
simpleFlow.map(failingFunction).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
simpleFlow.map(failingFunction).collect(value => collected = collected :+ value)
collected.size shouldBe 1
collected.head.isFailure shouldBe true
intercept[IllegalStateException](collected.head.get)
}
}

describe("Flows `flatMap`") {
it("should work") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
val f = (x: Item) => Flow { it.emit(x); it.emit(x + 1) }
alternatingFlow.flatMap(f).collect(value => emitted = emitted :+ value)
emitted shouldBe Seq.range(0, items).map(Success(_))
alternatingFlow.flatMap(f).collect(value => collected = collected :+ value)
collected shouldBe Seq.range(0, items).map(Success(_))
}

it("should emit a `Failure` if an exception is thrown in the original flow") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.flatMap(_ => alternatingFlow).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
failingFlow.flatMap(_ => alternatingFlow).collect(value => collected = collected :+ value)
collected.size shouldBe 1
collected.head.isFailure shouldBe true
intercept[IllegalStateException](collected.head.get)
}

it("should emit a `Failure` if an exception is thrown in the given function") {
var collected: Seq[Try[Item]] = Seq()
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.flatMap(_ => throw IllegalStateException()).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
failingFlow.flatMap(_ => throw IllegalStateException()).collect(value => collected = collected :+ value)
collected.size shouldBe 1
collected.head.isFailure shouldBe true
intercept[IllegalStateException](collected.head.get)
}
}
}
Expand Down

0 comments on commit 403f07e

Please sign in to comment.