From 3dc95b6696aa5ae5654dc723fcdbbe016c68044b Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 21 Feb 2022 17:05:41 +0100 Subject: [PATCH] attempt to use external submission for rescheduling of actor mailboxes To avoid fairness issue on JDK 17. Refs #17341, #31117 --- .../scala/akka/actor/ActorSystemSpec.scala | 5 +++-- .../dispatch/ForkJoinPoolStarvationSpec.scala | 2 +- .../akka/dispatch/AbstractDispatcher.scala | 7 ++++--- .../akka/dispatch/BalancingDispatcher.scala | 4 ++-- .../main/scala/akka/dispatch/Dispatcher.scala | 12 ++++++++---- .../ForkJoinExecutorConfigurator.scala | 18 +++++++++++++++--- .../src/main/scala/akka/dispatch/Mailbox.scala | 2 +- .../akka/dispatch/ThreadPoolBuilder.scala | 7 ++++++- .../akka/testkit/CallingThreadDispatcher.scala | 3 ++- project/JdkOptions.scala | 2 ++ 10 files changed, 44 insertions(+), 18 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index af79807babc..8450d26b686 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -88,8 +88,9 @@ object ActorSystemSpec { override protected[akka] def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean = { - val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint) + hasSystemMessageHint: Boolean, + reschedule: Boolean): Boolean = { + val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint, reschedule) doneIt.switchOn { TestKit.awaitCond(mbox.actor.actor != null, 1.second) mbox.actor.actor match { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala index 3419f2c318f..a08552f5020 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ForkJoinPoolStarvationSpec.scala @@ -14,7 +14,7 @@ object ForkJoinPoolStarvationSpec { |actorhang { | task-dispatcher { | mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" - | throughput = 5 + | throughput = 1000 | fork-join-executor { | parallelism-factor = 2 | parallelism-max = 2 diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index ca110846079..9c1d5447495 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -149,7 +149,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator */ final def attach(actor: ActorCell): Unit = { register(actor) - registerForExecution(actor.mailbox, false, true) + registerForExecution(actor.mailbox, false, true, false) } /** @@ -277,7 +277,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator protected[akka] def resume(actor: ActorCell): Unit = { val mbox = actor.mailbox if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume()) - registerForExecution(mbox, false, false) + registerForExecution(mbox, false, false, false) } /** @@ -302,7 +302,8 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator protected[akka] def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean + hasSystemMessageHint: Boolean, + rescheduled: Boolean): Boolean // TODO check whether this should not actually be a property of the mailbox /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 586519de27f..35a7980f053 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -96,7 +96,7 @@ private[akka] class BalancingDispatcher( override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - if (!registerForExecution(receiver.mailbox, false, false)) teamWork() + if (!registerForExecution(receiver.mailbox, false, false, false)) teamWork() } protected def teamWork(): Unit = @@ -108,7 +108,7 @@ private[akka] class BalancingDispatcher( case lm: LoadMetrics => !lm.atFullThrottle() case _ => true }) - && !registerForExecution(i.next.mailbox, false, false)) + && !registerForExecution(i.next.mailbox, false, false, false)) scheduleOne(i) scheduleOne() diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index b23304e36ba..d3132b0f40b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -61,7 +61,7 @@ class Dispatcher( protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) - registerForExecution(mbox, true, false) + registerForExecution(mbox, true, false, false) } /** @@ -70,7 +70,7 @@ class Dispatcher( protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = { val mbox = receiver.mailbox mbox.systemEnqueue(receiver.self, invocation) - registerForExecution(mbox, false, true) + registerForExecution(mbox, false, true, false) } /** @@ -120,11 +120,15 @@ class Dispatcher( protected[akka] override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean = { + hasSystemMessageHint: Boolean, + rescheduled: Boolean): Boolean = { if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { - executorService.execute(mbox) + if (rescheduled) + executorService.executeExternal(mbox) + else + executorService.execute(mbox) true } catch { case _: RejectedExecutionException => diff --git a/akka-actor/src/main/scala/akka/dispatch/ForkJoinExecutorConfigurator.scala b/akka-actor/src/main/scala/akka/dispatch/ForkJoinExecutorConfigurator.scala index 85766b7cce5..d4c6ffbdca3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ForkJoinExecutorConfigurator.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ForkJoinExecutorConfigurator.scala @@ -5,9 +5,10 @@ package akka.dispatch import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory } - import com.typesafe.config.Config +import java.lang.invoke.MethodHandles + object ForkJoinExecutorConfigurator { /** @@ -28,11 +29,22 @@ object ForkJoinExecutorConfigurator { override def execute(r: Runnable): Unit = if (r ne null) - super.execute( - (if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]]) + super.execute(createTask(r)) else throw new NullPointerException("Runnable was null") + def executeExternal(r: Runnable): Unit = + handle.invokeWithArguments(createTask(r)) + + private val handle = { + val m = classOf[ForkJoinPool].getDeclaredMethod("externalPush", classOf[ForkJoinTask[_]]) + m.setAccessible(true) + MethodHandles.lookup().unreflect(m).bindTo(this) + } + + private def createTask(r: Runnable): ForkJoinTask[Any] = + (if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]] + def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 6bde0d22512..2292ce3d2c6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -232,7 +232,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } } finally { setAsIdle() //Volatile write, needed here - dispatcher.registerForExecution(this, false, false) + dispatcher.registerForExecution(this, false, false, true) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 9459e52e4a9..a36edfd87d8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -4,6 +4,8 @@ package akka.dispatch +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool + import java.util.Collection import java.util.concurrent.{ ArrayBlockingQueue, @@ -21,7 +23,6 @@ import java.util.concurrent.{ TimeUnit } import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } - import scala.concurrent.{ BlockContext, CanAwait } import scala.concurrent.duration.Duration @@ -217,6 +218,10 @@ trait ExecutorServiceDelegate extends ExecutorService { def executor: ExecutorService def execute(command: Runnable) = executor.execute(command) + def executeExternal(command: Runnable) = + if (executor.isInstanceOf[AkkaForkJoinPool]) + executor.asInstanceOf[AkkaForkJoinPool].executeExternal(command) + else executor.execute(command) def shutdown(): Unit = { executor.shutdown() } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 9729bda0a00..132564174c3 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -169,7 +169,8 @@ class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) exte protected[akka] override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean = false + hasSystemMessageHint: Boolean, + reschedule: Boolean): Boolean = false protected[akka] override def shutdownTimeout = 1 second diff --git a/project/JdkOptions.scala b/project/JdkOptions.scala index 19c6aa5d8c7..aabc0268ca8 100644 --- a/project/JdkOptions.scala +++ b/project/JdkOptions.scala @@ -33,6 +33,8 @@ object JdkOptions extends AutoPlugin { if (isJdk17orHigher) { // for aeron "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" :: + // for reflective access to ForkJoinPool + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" :: // for LevelDB "--add-opens=java.base/java.nio=ALL-UNNAMED" :: Nil } else Nil