Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the number of "camera scans" constant in the philosophers benchmark #447

Merged
merged 5 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ import org.renaissance.License
defaultValue = "500000",
summary = "Number of meals consumed by each philosopher thread"
)
@Configuration(name = "test", settings = Array("meal_count = 500"))
@Parameter(
name = "block_meal_count",
defaultValue = "4096",
summary =
"Number of meals representing a block of progress. Determines determines the frequency of camera scans. Must be a power of two."
)
@Configuration(name = "test", settings = Array("meal_count = 500", "block_meal_count = 8"))
@Configuration(name = "jmh")
final class Philosophers extends Benchmark {

Expand All @@ -28,17 +34,33 @@ final class Philosophers extends Benchmark {

private var threadCountParam: Int = _

/** Number of meals consumed by each Philosopher thread. */
private var mealCountParam: Int = _

/**
* Number of meals consumed by each Philosopher thread.
* Number of meals representing a block of progress which determines
* the frequency of camera scans. Must be power of two to enable
* efficient checking.
*/
private var mealCountParam: Int = _
private var blockMealCountParam: Int = _

override def setUpBeforeAll(c: BenchmarkContext): Unit = {
def isPowerOfTwo(n: Int): Boolean = if (n <= 0) false else (n & (n - 1)) == 0

threadCountParam = c.parameter("thread_count").toPositiveInteger
mealCountParam = c.parameter("meal_count").toPositiveInteger

blockMealCountParam = c.parameter("block_meal_count").toPositiveInteger
if (!isPowerOfTwo(blockMealCountParam)) {
throw new IllegalArgumentException(
s"the 'block_meal_count' parameter is not a power of two: $blockMealCountParam"
)
}
}

private def validate(forkOwners: Seq[Option[String]], mealsEaten: Seq[Int]): Unit = {
private def validate(result: (Seq[Option[String]], Seq[Int], Int)): Unit = {
val (forkOwners, mealsEaten, scanCount) = result

// All forks should be available, i.e., not owned by anyone.
for (i <- 0 until threadCountParam) {
Assert.assertEquals(None, forkOwners(i), s"owner of fork %i")
Expand All @@ -48,13 +70,21 @@ final class Philosophers extends Benchmark {
for (i <- 0 until threadCountParam) {
Assert.assertEquals(mealCountParam, mealsEaten(i), s"meals eaten by philosopher $i")
}

// The camera performed the expected number of scans.
val expectedScanCount = mealCountParam / blockMealCountParam
Assert.assertEquals(expectedScanCount, scanCount, "camera scans")
}

override def run(c: BenchmarkContext): BenchmarkResult = {
val (forkOwners, mealsEaten) = RealityShowPhilosophers.run(mealCountParam, threadCountParam)
val result = RealityShowPhilosophers.run(
mealCountParam,
threadCountParam,
blockMealCountParam
)

() => {
validate(forkOwners, mealsEaten)
validate(result)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,107 +1,194 @@
package org.renaissance.scala.stm

import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.stm._
import scala.concurrent.stm.atomic
import scala.concurrent.stm.retry
import scala.concurrent.stm.Ref

/**
* This extends a solution to the dining philosopher's problem to include an
* outside perspective that occasionally examines everything that is
* happening.
* outside perspective that periodically examines what is happening.
*/
object RealityShowPhilosophers {

class Fork {
val owner = Ref(None: Option[String])
/**
* Issues quasi-periodic requests to snapshot the state of the "show".
* Instead of regular wall-clock intervals, the requests are triggered
* when certain progress has been made (measured with coarse granularity).
*
* The requests are counted to prevent potential loss of wake-ups due to
* increasing rate of progress (with decreasing contention). This ensures
* a constant number of "camera scans" in the workload.
*/
private final class CameraController(philosopherCount: Int, blockMealCount: Int) {

/** Mask to simplify block completion checks. Requires [[blockMealCount]] to be a power of 2. */
private val blockMask = blockMealCount - 1

private val blockCount = Ref(0)
private val requestCount = Ref(0)
private val scanCount = Ref(0)
private val terminate = Ref(false)

/** @return `true` if the current progress amounts to a complete block, `false` otherwise. */
def isBlockComplete(mealsEaten: Int): Boolean = (mealsEaten & blockMask) == 0

/**
* Notifies the controller that a block of progress has been made.
* Triggers a camera scan when enough blocks have been reported.
*/
def notifyBlockComplete(): Unit = {
atomic { implicit txn =>
val newBlockCount = blockCount.transformAndGet(_ + 1)
if ((newBlockCount % philosopherCount) == 0) {
requestCount += 1
}
}
}

/**
* Instructs the controller to signal termination when the number of scans
* performed reaches the number of scans requested.
*/
def shutdown(): Unit = {
terminate.single() = true
}

/**
* Waits for a camera "scan" request, unless the number of scans performed
* reached the total (fixed) number of scans to be performed in the workload.
*
* @return scan request index (non-negative), `-1` to indicate termination.
*/
def awaitRequest(): Int = {
atomic { implicit txn =>
if (scanCount() < requestCount()) {
scanCount.getAndTransform(_ + 1)
} else if (terminate()) {
// Signal end of processing.
-1
} else {
retry
}
}
}
}

private final class Fork(val name: String) {
private[stm] val owner = Ref(None: Option[String])
}

class PhilosopherThread(
private final class PhilosopherThread(
val name: String,
val meals: Int,
left: Fork,
right: Fork
right: Fork,
controller: CameraController
) extends Thread {
val mealsEaten = Ref(0)
private[stm] val mealsEaten = Ref(0)

override def run(): Unit = {
val self = Some(name)

for (_ <- 0 until meals) {
// Thinking.
atomic { implicit txn =>
if (!(left.owner().isEmpty && right.owner().isEmpty))
if (!(left.owner().isEmpty && right.owner().isEmpty)) {
retry
left.owner() = Some(name)
right.owner() = Some(name)
}

left.owner() = self
right.owner() = self
}

// Eating.
atomic { implicit txn =>
mealsEaten += 1
val newMealsEaten = atomic { implicit txn =>
left.owner() = None
right.owner() = None
mealsEaten.transformAndGet(_ + 1)
}

if (controller.isBlockComplete(newMealsEaten)) {
controller.notifyBlockComplete()
}
}
}

def done: Boolean = mealsEaten.single() == meals

override def toString: String =
"%s is %5.2f%% done".format(name, mealsEaten.single() * 100.0 / meals)
}

class CameraThread(
intervalMilli: Int,
private class CameraThread(
forks: Seq[Fork],
philosophers: Seq[PhilosopherThread]
philosophers: Seq[PhilosopherThread],
controller: CameraController
) extends Thread {
val outputs = mutable.Buffer[String]()

@tailrec final override def run(): Unit = {
Thread.sleep(intervalMilli)
val (str, done) = image
outputs += str
if (!done) {
run()
} else {
// TODO Consistent way of handling stdout.
// See https://github.com/D-iii-S/renaissance-benchmarks/issues/20
println(s"Camera thread performed ${outputs.length} scans.")
private[stm] val images = mutable.Buffer[String]()

final override def run(): Unit = {
// Captures "image" of the show's state when requested.
// Finish execution upon receiving an empty request.
while (controller.awaitRequest() >= 0) {
// Separate state snapshot from rendering.
val (forkOwners, mealsEaten) = stateSnapshot
images += renderImage(forkOwners, mealsEaten)
}

// TODO Consistent way of handling stdout.
// See https://github.com/D-iii-S/renaissance-benchmarks/issues/20
println(s"Camera thread performed ${images.length} scans.")
}

/**
* We want to print exactly one image of the final state, so we check
* completion at the same time as building the image.
*/
def image: (String, Boolean) =
def stateSnapshot: (Seq[Option[String]], Seq[Int]) =
atomic { implicit txn =>
val buf = new StringBuilder
for (i <- forks.indices)
buf ++= "fork %d is owned by %s\n".format(i, forks(i).owner.single())
var done = true
for (p <- philosophers) {
buf ++= p.toString += '\n'
done &&= p.done
}
(buf.toString, done)
val forkOwners = forks.map(_.owner.get)
val mealsEaten = philosophers.map(_.mealsEaten.get)
(forkOwners, mealsEaten)
}
}

def run(mealCount: Int, philosopherCount: Int): (Seq[Option[String]], Seq[Int]) = {
val names = for (i <- 0 until philosopherCount) yield {
s"philosopher-$i"
}
val forks = Array.tabulate(names.size) { _ =>
new Fork
private def renderImage(forkOwners: Seq[Option[String]], mealsEaten: Seq[Int]): String = {
val image = new StringBuilder

forks.zip(forkOwners).foreach {
case (f, owner) =>
image ++= "%s is owned by %s\n".format(f.name, owner)
}

philosophers.zip(mealsEaten).foreach {
case (p, eaten) =>
image ++= "%s is %5.2f%% done\n".format(p.name, eaten * 100.0 / p.meals)
}

image.toString()
}
val pthreads = Array.tabulate(names.size) { i =>
new PhilosopherThread(names(i), mealCount, forks(i), forks((i + 1) % forks.length))
}

def run(
mealCount: Int,
philosopherCount: Int,
blockMealCount: Int
): (Seq[Option[String]], Seq[Int], Int) = {
val forks = Array.tabulate(philosopherCount) { i => new Fork(s"fork-$i") }
val controller = new CameraController(philosopherCount, blockMealCount)
val philosophers = Array.tabulate(philosopherCount) { i =>
new PhilosopherThread(
s"philosopher-$i",
mealCount,
forks(i),
forks((i + 1) % forks.length),
controller
)
}
val camera = new CameraThread(1000 / 60, forks, pthreads)

val camera = new CameraThread(forks, philosophers, controller)
camera.start()
for (t <- pthreads) t.start()
for (t <- pthreads) t.join()

philosophers.foreach(_.start())
philosophers.foreach(_.join())

// Signal shutdown to the camera after the philosophers finish.
controller.shutdown()
camera.join()
atomic { implicit txn =>
(forks.map(_.owner.get), pthreads.map(_.mealsEaten.get))
}

// Collect fork owners and meals eaten for validation.
val (forkOwners, mealsEaten) = camera.stateSnapshot
(forkOwners, mealsEaten, camera.images.length)
}
}