Skip to content

Commit

Permalink
fix(TerminableChannel): synchronized block
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Feb 29, 2024
1 parent b89726f commit 9228f91
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.tassiLuca.pimping

import gears.async.{Async, BufferedChannel, Channel, SyncChannel, UnboundedChannel}
import gears.async.Channel.Closed
import gears.async.{Async, BufferedChannel, Channel, Future, Listener, SyncChannel, UnboundedChannel}

import scala.annotation.tailrec
import scala.reflect.ClassTag
Expand Down Expand Up @@ -33,16 +34,14 @@ object TerminableChannel:

override val readSource: Async.Source[Res[Terminable[T]]] =
c.readSource.transformValuesWith {
case v @ Right(Terminated) =>
c.close()
v
case Right(Terminated) => c.close(); Left(Channel.Closed)
case v @ _ => v
}

override def sendSource(x: Terminable[T]): Async.Source[Res[Unit]] = x match
case Terminated =>
if synchronized(_terminated) then throw IllegalStateException("Channel already terminated!")
else synchronized { _terminated = true }
synchronized:
if _terminated then throw IllegalStateException("Channel already terminated!") else _terminated = true
c.sendSource(x)
case t => c.sendSource(t)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.tassiLuca.pimping

import gears.async.default.given
import gears.async.TaskSchedule.Every
import gears.async.{Async, AsyncOperations, Channel, Listener, SendableChannel, Task}
import gears.async.{Async, AsyncOperations, Channel, Listener, SendableChannel, Task, UnboundedChannel}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import io.github.tassiLuca.pimping.TerminableChannelOps.foreach
Expand All @@ -17,28 +17,18 @@ class ChannelsPimpingTest extends AnyFunSpec with Matchers {
Async.blocking:
val channel = TerminableChannel.ofUnbounded[Item]
channel.terminate()
channel.read() shouldBe Right(Terminated)
channel.read() shouldBe Left(Channel.Closed)
channel.read() shouldBe Left(Channel.Closed)
}

it("once closed, should be traversable") {
Async.blocking:
var collectedItems = Seq[Item]()
val channel = TerminableChannel.ofUnbounded[Item]
produceOn(channel).run.onComplete(Listener { (_, _) => channel.send(Terminated) })
produceOn(channel).run.onComplete(Listener { (_, _) => channel.terminate() })
channel.foreach(res => collectedItems = collectedItems :+ res)
collectedItems shouldBe Seq.range(0, itemsProduced)
}

it("Should again throw") {
Async.blocking:
val channel = TerminableChannel.ofUnbounded[Item]
produceOn(channel).run.onComplete(Listener { (_, _) => channel.send(Terminated) })
channel.foreach(res =>
println(s"test3 : $res")
println(res),
)
}
}

def produceOn(channel: TerminableChannel[Item]): Task[Unit] =
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {

gitHooks {
preCommit {
tasks("check")
tasks("test")
}
commitMsg {
conventionalCommits()
Expand Down

0 comments on commit 9228f91

Please sign in to comment.