diff --git a/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/Philosophers.scala b/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/Philosophers.scala index 7d3a3b04..3cddb5c2 100644 --- a/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/Philosophers.scala +++ b/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/Philosophers.scala @@ -19,7 +19,13 @@ import org.renaissance.License defaultValue = "500000", summary = "Number of meals consumed by each philosopher thread" ) -@Configuration(name = "test", settings = Array("meal_count = 500")) +@Parameter( + name = "block_meal_count", + defaultValue = "4096", + summary = + "Number of meals representing a block of progress. Determines determines the frequency of camera scans. Must be a power of two." +) +@Configuration(name = "test", settings = Array("meal_count = 500", "block_meal_count = 8")) @Configuration(name = "jmh") final class Philosophers extends Benchmark { @@ -28,17 +34,33 @@ final class Philosophers extends Benchmark { private var threadCountParam: Int = _ + /** Number of meals consumed by each Philosopher thread. */ + private var mealCountParam: Int = _ + /** - * Number of meals consumed by each Philosopher thread. + * Number of meals representing a block of progress which determines + * the frequency of camera scans. Must be power of two to enable + * efficient checking. */ - private var mealCountParam: Int = _ + private var blockMealCountParam: Int = _ override def setUpBeforeAll(c: BenchmarkContext): Unit = { + def isPowerOfTwo(n: Int): Boolean = if (n <= 0) false else (n & (n - 1)) == 0 + threadCountParam = c.parameter("thread_count").toPositiveInteger mealCountParam = c.parameter("meal_count").toPositiveInteger + + blockMealCountParam = c.parameter("block_meal_count").toPositiveInteger + if (!isPowerOfTwo(blockMealCountParam)) { + throw new IllegalArgumentException( + s"the 'block_meal_count' parameter is not a power of two: $blockMealCountParam" + ) + } } - private def validate(forkOwners: Seq[Option[String]], mealsEaten: Seq[Int]): Unit = { + private def validate(result: (Seq[Option[String]], Seq[Int], Int)): Unit = { + val (forkOwners, mealsEaten, scanCount) = result + // All forks should be available, i.e., not owned by anyone. for (i <- 0 until threadCountParam) { Assert.assertEquals(None, forkOwners(i), s"owner of fork %i") @@ -48,13 +70,21 @@ final class Philosophers extends Benchmark { for (i <- 0 until threadCountParam) { Assert.assertEquals(mealCountParam, mealsEaten(i), s"meals eaten by philosopher $i") } + + // The camera performed the expected number of scans. + val expectedScanCount = mealCountParam / blockMealCountParam + Assert.assertEquals(expectedScanCount, scanCount, "camera scans") } override def run(c: BenchmarkContext): BenchmarkResult = { - val (forkOwners, mealsEaten) = RealityShowPhilosophers.run(mealCountParam, threadCountParam) + val result = RealityShowPhilosophers.run( + mealCountParam, + threadCountParam, + blockMealCountParam + ) () => { - validate(forkOwners, mealsEaten) + validate(result) } } diff --git a/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/RealityShowPhilosophers.scala b/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/RealityShowPhilosophers.scala index 820a0c7c..ae869262 100644 --- a/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/RealityShowPhilosophers.scala +++ b/benchmarks/scala-stm/src/main/scala/org/renaissance/scala/stm/RealityShowPhilosophers.scala @@ -1,107 +1,194 @@ package org.renaissance.scala.stm -import scala.annotation.tailrec import scala.collection.mutable -import scala.concurrent.stm._ +import scala.concurrent.stm.atomic +import scala.concurrent.stm.retry +import scala.concurrent.stm.Ref /** * This extends a solution to the dining philosopher's problem to include an - * outside perspective that occasionally examines everything that is - * happening. + * outside perspective that periodically examines what is happening. */ object RealityShowPhilosophers { - class Fork { - val owner = Ref(None: Option[String]) + /** + * Issues quasi-periodic requests to snapshot the state of the "show". + * Instead of regular wall-clock intervals, the requests are triggered + * when certain progress has been made (measured with coarse granularity). + * + * The requests are counted to prevent potential loss of wake-ups due to + * increasing rate of progress (with decreasing contention). This ensures + * a constant number of "camera scans" in the workload. + */ + private final class CameraController(philosopherCount: Int, blockMealCount: Int) { + + /** Mask to simplify block completion checks. Requires [[blockMealCount]] to be a power of 2. */ + private val blockMask = blockMealCount - 1 + + private val blockCount = Ref(0) + private val requestCount = Ref(0) + private val scanCount = Ref(0) + private val terminate = Ref(false) + + /** @return `true` if the current progress amounts to a complete block, `false` otherwise. */ + def isBlockComplete(mealsEaten: Int): Boolean = (mealsEaten & blockMask) == 0 + + /** + * Notifies the controller that a block of progress has been made. + * Triggers a camera scan when enough blocks have been reported. + */ + def notifyBlockComplete(): Unit = { + atomic { implicit txn => + val newBlockCount = blockCount.transformAndGet(_ + 1) + if ((newBlockCount % philosopherCount) == 0) { + requestCount += 1 + } + } + } + + /** + * Instructs the controller to signal termination when the number of scans + * performed reaches the number of scans requested. + */ + def shutdown(): Unit = { + terminate.single() = true + } + + /** + * Waits for a camera "scan" request, unless the number of scans performed + * reached the total (fixed) number of scans to be performed in the workload. + * + * @return scan request index (non-negative), `-1` to indicate termination. + */ + def awaitRequest(): Int = { + atomic { implicit txn => + if (scanCount() < requestCount()) { + scanCount.getAndTransform(_ + 1) + } else if (terminate()) { + // Signal end of processing. + -1 + } else { + retry + } + } + } + } + + private final class Fork(val name: String) { + private[stm] val owner = Ref(None: Option[String]) } - class PhilosopherThread( + private final class PhilosopherThread( val name: String, val meals: Int, left: Fork, - right: Fork + right: Fork, + controller: CameraController ) extends Thread { - val mealsEaten = Ref(0) + private[stm] val mealsEaten = Ref(0) override def run(): Unit = { + val self = Some(name) + for (_ <- 0 until meals) { // Thinking. atomic { implicit txn => - if (!(left.owner().isEmpty && right.owner().isEmpty)) + if (!(left.owner().isEmpty && right.owner().isEmpty)) { retry - left.owner() = Some(name) - right.owner() = Some(name) + } + + left.owner() = self + right.owner() = self } + // Eating. - atomic { implicit txn => - mealsEaten += 1 + val newMealsEaten = atomic { implicit txn => left.owner() = None right.owner() = None + mealsEaten.transformAndGet(_ + 1) + } + + if (controller.isBlockComplete(newMealsEaten)) { + controller.notifyBlockComplete() } } } - - def done: Boolean = mealsEaten.single() == meals - - override def toString: String = - "%s is %5.2f%% done".format(name, mealsEaten.single() * 100.0 / meals) } - class CameraThread( - intervalMilli: Int, + private class CameraThread( forks: Seq[Fork], - philosophers: Seq[PhilosopherThread] + philosophers: Seq[PhilosopherThread], + controller: CameraController ) extends Thread { - val outputs = mutable.Buffer[String]() - - @tailrec final override def run(): Unit = { - Thread.sleep(intervalMilli) - val (str, done) = image - outputs += str - if (!done) { - run() - } else { - // TODO Consistent way of handling stdout. - // See https://github.com/D-iii-S/renaissance-benchmarks/issues/20 - println(s"Camera thread performed ${outputs.length} scans.") + private[stm] val images = mutable.Buffer[String]() + + final override def run(): Unit = { + // Captures "image" of the show's state when requested. + // Finish execution upon receiving an empty request. + while (controller.awaitRequest() >= 0) { + // Separate state snapshot from rendering. + val (forkOwners, mealsEaten) = stateSnapshot + images += renderImage(forkOwners, mealsEaten) } + + // TODO Consistent way of handling stdout. + // See https://github.com/D-iii-S/renaissance-benchmarks/issues/20 + println(s"Camera thread performed ${images.length} scans.") } - /** - * We want to print exactly one image of the final state, so we check - * completion at the same time as building the image. - */ - def image: (String, Boolean) = + def stateSnapshot: (Seq[Option[String]], Seq[Int]) = atomic { implicit txn => - val buf = new StringBuilder - for (i <- forks.indices) - buf ++= "fork %d is owned by %s\n".format(i, forks(i).owner.single()) - var done = true - for (p <- philosophers) { - buf ++= p.toString += '\n' - done &&= p.done - } - (buf.toString, done) + val forkOwners = forks.map(_.owner.get) + val mealsEaten = philosophers.map(_.mealsEaten.get) + (forkOwners, mealsEaten) } - } - def run(mealCount: Int, philosopherCount: Int): (Seq[Option[String]], Seq[Int]) = { - val names = for (i <- 0 until philosopherCount) yield { - s"philosopher-$i" - } - val forks = Array.tabulate(names.size) { _ => - new Fork + private def renderImage(forkOwners: Seq[Option[String]], mealsEaten: Seq[Int]): String = { + val image = new StringBuilder + + forks.zip(forkOwners).foreach { + case (f, owner) => + image ++= "%s is owned by %s\n".format(f.name, owner) + } + + philosophers.zip(mealsEaten).foreach { + case (p, eaten) => + image ++= "%s is %5.2f%% done\n".format(p.name, eaten * 100.0 / p.meals) + } + + image.toString() } - val pthreads = Array.tabulate(names.size) { i => - new PhilosopherThread(names(i), mealCount, forks(i), forks((i + 1) % forks.length)) + } + + def run( + mealCount: Int, + philosopherCount: Int, + blockMealCount: Int + ): (Seq[Option[String]], Seq[Int], Int) = { + val forks = Array.tabulate(philosopherCount) { i => new Fork(s"fork-$i") } + val controller = new CameraController(philosopherCount, blockMealCount) + val philosophers = Array.tabulate(philosopherCount) { i => + new PhilosopherThread( + s"philosopher-$i", + mealCount, + forks(i), + forks((i + 1) % forks.length), + controller + ) } - val camera = new CameraThread(1000 / 60, forks, pthreads) + + val camera = new CameraThread(forks, philosophers, controller) camera.start() - for (t <- pthreads) t.start() - for (t <- pthreads) t.join() + + philosophers.foreach(_.start()) + philosophers.foreach(_.join()) + + // Signal shutdown to the camera after the philosophers finish. + controller.shutdown() camera.join() - atomic { implicit txn => - (forks.map(_.owner.get), pthreads.map(_.mealsEaten.get)) - } + + // Collect fork owners and meals eaten for validation. + val (forkOwners, mealsEaten) = camera.stateSnapshot + (forkOwners, mealsEaten, camera.images.length) } }