Skip to content

Commit

Permalink
test: add a sleep to see if real cancellation or not
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Jan 16, 2024
1 parent c262894 commit 55b2582
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/main/scala/io/github/tassiLuca/UseChannelMultiplexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ import scala.util.{Random, Try}
/** ChannelMultiplexer := active entity which constantly (i.e. loops, indeed the [[run]] method is blocking!!) reads the
* producers channels' and forward the read value towards all the subscribers channels'. Order is guaranteed only per
* producer. Consumers must loops to continually listen for new values!
*
* Remarks:
* - if producer channel is bounded, even with no consumers, it will continue producing items onto it (the
* multiplexer reads value, freeing up space into the buffer)
* - if a consumer start reading value after the producer started, missing some values, those values are gone => like
* "hot observables" in Rx.
*/
object UseChannelMultiplexer extends App:

type Item = String

/** The number of producers. */
val producers = 2
val producers = 1

/** The number of consumers. */
val consumers = 3
val consumers = 1

/** The size of the bounded channel. */
val bufferSize = 10
Expand All @@ -46,15 +52,16 @@ object UseChannelMultiplexer extends App:
Async.blocking:
val multiplexer = ChannelMultiplexer[Item]()
Future {
// blocking call until the multiplexer is cancelled => needs to be called on a new thread
// blocking call until the multiplexer is closed => needs to be called on a new thread
multiplexer.run()
}
for i <- 0 until producers do
val (producer, producerChannel) = scheduledProducer(i.toString, Random.nextLong(4_000))
multiplexer.addPublisher(producerChannel.asReadable)
producer.run
Thread.sleep(10_000)
for i <- 0 until consumers do
val (consumer, consumerChannel) = loopedConsumer(i.toString)
multiplexer.addSubscriber(consumerChannel.asSendable)
consumer.run
for i <- 0 until producers do
val (producer, producerChannel) = scheduledProducer(i.toString, Random.nextLong(6_000) + 2_000)
multiplexer.addPublisher(producerChannel.asReadable)
producer.run
Thread.sleep(30_000)
Thread.sleep(60_000)
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ class BlogPostsServiceTest extends AnyFlatSpec with BeforeAndAfterEach:
val result = blogPostsApp.service.create("unauthorized", postTitle, postBody)
result.isLeft shouldBe true
// the cancelling can be observed looking at the logs :(
Thread.sleep(15_000)
}

0 comments on commit 55b2582

Please sign in to comment.