From 9dc20db6f629b68f914f3f73f8fa9c938c306134 Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Wed, 19 Apr 2023 09:09:54 -0400 Subject: [PATCH 1/3] Use parasitic context by default for pipeTo --- .../scala/akka/pattern/PipeToSupport.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala index e192fda77bb..2f7abbe4f1c 100644 --- a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala @@ -14,6 +14,7 @@ import language.implicitConversions import akka.actor.{ Actor, ActorRef, Status } import akka.actor.ActorSelection +import akka.dispatch.ExecutionContexts import akka.util.unused trait PipeToSupport { @@ -78,7 +79,6 @@ trait PipeToSupport { * * {{{ * import akka.pattern.pipe - * // requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor * * Future { doExpensiveCalc() } pipeTo nextActor * @@ -90,16 +90,20 @@ trait PipeToSupport { * * The successful result of the future is sent as a message to the recipient, or * the failure is sent in a [[akka.actor.Status.Failure]] to the recipient. + * + * By default this uses a [[scala.concurrent.ExecutionContext]] which sends the message on the + * calling thread if the future has already completed, or on the thread which completes the future + * if the future has not yet completed. */ - implicit def pipe[T](future: Future[T])(implicit executionContext: ExecutionContext): PipeableFuture[T] = + implicit def pipe[T](future: Future[T])( + implicit executionContext: ExecutionContext = ExecutionContexts.parasitic): PipeableFuture[T] = new PipeableFuture(future) /** - * Import this implicit conversion to gain the `pipeTo` method on [[scala.concurrent.Future]]: + * Import this implicit conversion to gain the `pipeTo` method on [[java.util.concurrent.CompletionStage]]: * * {{{ * import akka.pattern.pipe - * // requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor * * Future { doExpensiveCalc() } pipeTo nextActor * @@ -111,7 +115,12 @@ trait PipeToSupport { * * The successful result of the future is sent as a message to the recipient, or * the failure is sent in a [[akka.actor.Status.Failure]] to the recipient. + * + * Regardless of the passed [[scala.concurrent.ExecutionContext]], the message will be + * sent from the calling thread if the future has already completed, or on the thread which + * completes the future if the future has not yet completed. */ implicit def pipeCompletionStage[T](future: CompletionStage[T])( - implicit executionContext: ExecutionContext): PipeableCompletionStage[T] = new PipeableCompletionStage(future) + implicit executionContext: ExecutionContext = ExecutionContexts.parasitic): PipeableCompletionStage[T] = + new PipeableCompletionStage(future) } From c9a359583e0ca0f48280899bc5ba2eef4e899d52 Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Wed, 19 Apr 2023 12:35:04 -0400 Subject: [PATCH 2/3] tests --- .../test/scala/akka/pattern/PipeToSpec.scala | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala index 6de58cef2f3..9128d74ff10 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala @@ -12,13 +12,22 @@ import akka.testkit.TestProbe class PipeToSpec extends AkkaSpec { - import system.dispatcher + // Do not try this at home, kids. In real code, this should be a `Future.successful` + def future42(): Future[Int] = Future(42)(system.dispatcher) "PipeTo" must { "work" in { val p = TestProbe() - Future(42).pipeTo(p.ref) + future42().pipeTo(p.ref) + p.expectMsg(42) + } + + "work with an implicit ExecutionContext" in { + import system.dispatcher // installs an EC in implicit scope + + val p = TestProbe() + future42().pipeTo(p.ref) p.expectMsg(42) } @@ -31,20 +40,20 @@ class PipeToSpec extends AkkaSpec { "pick up an implicit sender()" in { val p = TestProbe() implicit val s = testActor - Future(42).pipeTo(p.ref) + future42().pipeTo(p.ref) p.expectMsg(42) p.lastSender should ===(s) } "work in Java form" in { val p = TestProbe() - pipe(Future(42)) to p.ref + pipe(future42()) to p.ref p.expectMsg(42) } "work in Java form with sender()" in { val p = TestProbe() - pipe(Future(42)).to(p.ref, testActor) + pipe(future42()).to(p.ref, testActor) p.expectMsg(42) p.lastSender should ===(testActor) } @@ -56,7 +65,16 @@ class PipeToSpec extends AkkaSpec { "work" in { val p = TestProbe() val sel = system.actorSelection(p.ref.path) - Future(42).pipeToSelection(sel) + future42().pipeToSelection(sel) + p.expectMsg(42) + } + + "work with an implicit ExecutionContext" in { + import system.dispatcher + + val p = TestProbe() + val sel = system.actorSelection(p.ref.path) + future42().pipeToSelection(sel) p.expectMsg(42) } @@ -71,7 +89,7 @@ class PipeToSpec extends AkkaSpec { val p = TestProbe() val sel = system.actorSelection(p.ref.path) implicit val s = testActor - Future(42).pipeToSelection(sel) + future42().pipeToSelection(sel) p.expectMsg(42) p.lastSender should ===(s) } @@ -79,14 +97,14 @@ class PipeToSpec extends AkkaSpec { "work in Java form" in { val p = TestProbe() val sel = system.actorSelection(p.ref.path) - pipe(Future(42)) to sel + pipe(future42()) to sel p.expectMsg(42) } "work in Java form with sender()" in { val p = TestProbe() val sel = system.actorSelection(p.ref.path) - pipe(Future(42)).to(sel, testActor) + pipe(future42()).to(sel, testActor) p.expectMsg(42) p.lastSender should ===(testActor) } From be157867172b8eece0566f7f7f8a4fa6b3b5d017 Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Wed, 19 Apr 2023 13:18:10 -0400 Subject: [PATCH 3/3] stylin --- akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala index 9128d74ff10..57a7568df91 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PipeToSpec.scala @@ -24,7 +24,7 @@ class PipeToSpec extends AkkaSpec { } "work with an implicit ExecutionContext" in { - import system.dispatcher // installs an EC in implicit scope + import system.dispatcher // installs an EC in implicit scope val p = TestProbe() future42().pipeTo(p.ref)