diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 8ee1c681..44b0b39c 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -3,7 +3,5 @@ - - \ No newline at end of file diff --git a/commons/src/main/scala/io/github/tassiLuca/pimping/TerminableChannel.scala b/commons/src/main/scala/io/github/tassiLuca/pimping/TerminableChannel.scala index 0c47d2bd..7f2bfeaa 100644 --- a/commons/src/main/scala/io/github/tassiLuca/pimping/TerminableChannel.scala +++ b/commons/src/main/scala/io/github/tassiLuca/pimping/TerminableChannel.scala @@ -1,9 +1,10 @@ package io.github.tassiLuca.pimping import gears.async.Channel.Closed -import gears.async.{Async, BufferedChannel, Channel, Future, Listener, SyncChannel, UnboundedChannel} +import gears.async.{Async, BufferedChannel, Channel, SyncChannel, UnboundedChannel, uninterruptible} import scala.annotation.tailrec +import scala.language.postfixOps import scala.reflect.ClassTag /** A token to be sent to a channel to signal that it has been terminated. */ @@ -11,8 +12,12 @@ case object Terminated type Terminated = Terminated.type +/** A union type of [[T]] and [[Terminated]]. */ type Terminable[T] = T | Terminated +/** A [[Channel]] that can be terminated, signalling no more items will be sent, + * still allowing to consumer to read pending values. + */ trait TerminableChannel[T] extends Channel[Terminable[T]]: def terminate()(using Async): Unit @@ -47,7 +52,8 @@ object TerminableChannel: override def close(): Unit = c.close() - override def terminate()(using Async): Unit = c.send(Terminated) + override def terminate()(using Async): Unit = uninterruptible: + c.send(Terminated) object TerminableChannelOps: diff --git a/commons/src/test/scala/io/github/tassiLuca/ChannelsContextTest.scala b/commons/src/test/scala/io/github/tassiLuca/ChannelsContextTest.scala index 5916318a..8c85a411 100644 --- a/commons/src/test/scala/io/github/tassiLuca/ChannelsContextTest.scala +++ b/commons/src/test/scala/io/github/tassiLuca/ChannelsContextTest.scala @@ -6,8 +6,9 @@ import gears.async.TaskSchedule.{Every, RepeatUntilFailure} import gears.async.default.given import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers +import io.github.tassiLuca.pimping.ChannelsPimping.toTry -import scala.util.Random +import scala.util.{Failure, Random} class ChannelsContextTest extends AnyFunSpec with Matchers { @@ -17,7 +18,7 @@ class ChannelsContextTest extends AnyFunSpec with Matchers { describe("Consumer") { it("read no item if the producer is run in another context") { var i = 0 - val channel = BufferedChannel[Int](itemsProduced) + val channel = BufferedChannel[Item](itemsProduced) Async.blocking: channel.consume { case Left(_) => () @@ -27,6 +28,17 @@ class ChannelsContextTest extends AnyFunSpec with Matchers { produceOn(channel).run.await i shouldBe 0 } + + it("receive a Cancellation exception if a channel is used as a container of futures produced in other process") { + Async.blocking: + val channel = UnboundedChannel[Future[Item]]() + Future: + for _ <- 0 to itemsProduced do channel.send(Future { AsyncOperations.sleep(5_000); 0 }) + for _ <- 0 to itemsProduced do + val result = channel.read().toTry().flatMap(_.awaitResult) + result.isFailure shouldBe true + intercept[CancellationException](result.get) + } } /* diff --git a/commons/src/test/scala/io/github/tassiLuca/pimping/ChannelsPimpingTest.scala b/commons/src/test/scala/io/github/tassiLuca/pimping/ChannelsPimpingTest.scala index 232591bb..0fc2817e 100644 --- a/commons/src/test/scala/io/github/tassiLuca/pimping/ChannelsPimpingTest.scala +++ b/commons/src/test/scala/io/github/tassiLuca/pimping/ChannelsPimpingTest.scala @@ -25,7 +25,7 @@ class ChannelsPimpingTest extends AnyFunSpec with Matchers { Async.blocking: var collectedItems = Seq[Item]() val channel = TerminableChannel.ofUnbounded[Item] - produceOn(channel).run.onComplete(Listener { (_, _) => channel.terminate() }) + produceOn(channel).run.onComplete(Listener((_, _) => channel.send(Terminated))) channel.foreach(res => collectedItems = collectedItems :+ res) collectedItems shouldBe Seq.range(0, itemsProduced) } diff --git a/docs/content/res/schemas/.$diagrams.drawio.bkp b/docs/content/res/schemas/.$diagrams.drawio.bkp index d3ad83ee..86d4fbeb 100644 --- a/docs/content/res/schemas/.$diagrams.drawio.bkp +++ b/docs/content/res/schemas/.$diagrams.drawio.bkp @@ -1,6 +1,6 @@ - + - + @@ -101,7 +101,7 @@ - + @@ -122,14 +122,14 @@ - + - - + + diff --git a/docs/content/res/schemas/diagrams.drawio b/docs/content/res/schemas/diagrams.drawio index 565d96a1..86a152c6 100644 --- a/docs/content/res/schemas/diagrams.drawio +++ b/docs/content/res/schemas/diagrams.drawio @@ -1,4 +1,4 @@ - + @@ -122,14 +122,14 @@ - + - - + +