diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala index 28f22d642af..2cf80a96d37 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala @@ -13,6 +13,15 @@ package org.apache.pekko.stream.scaladsl +import java.util.concurrent.atomic.AtomicInteger + +import scala.annotation.nowarn +import scala.concurrent.Await +import scala.concurrent.Promise +import scala.concurrent.duration.DurationInt +import scala.util.Success +import scala.util.control.NoStackTrace + import org.apache.pekko import pekko.Done import pekko.stream.AbruptStageTerminationException @@ -21,16 +30,10 @@ import pekko.stream.ActorMaterializer import pekko.stream.Supervision import pekko.stream.testkit.StreamSpec import pekko.stream.testkit.TestSubscriber +import pekko.stream.testkit.Utils.TE import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.testkit.scaladsl.TestSource - -import java.util.concurrent.atomic.AtomicInteger -import scala.annotation.nowarn -import scala.concurrent.Await -import scala.concurrent.Promise -import scala.concurrent.duration.DurationInt -import scala.util.Success -import scala.util.control.NoStackTrace +import pekko.testkit.EventFilter class FlowStatefulMapSpec extends StreamSpec { @@ -371,5 +374,64 @@ class FlowStatefulMapSpec extends StreamSpec { .expectComplete() gate.ensure() } + + "will not call `onComplete` twice if `f` fail" in { + val closedCounter = new AtomicInteger(0) + val probe = Source + .repeat(1) + .statefulMap(() => "opening resource")( + (_, _) => throw TE("failing read"), + _ => { + closedCounter.incrementAndGet() + None + }) + .runWith(TestSink.probe[String]) + + probe.request(1) + probe.expectError(TE("failing read")) + closedCounter.get() should ===(1) + } + + "will not call `onComplete` twice if both `f` and `onComplete` fail" in { + val closedCounter = new AtomicInteger(0) + val probe = Source + .repeat(1) + .statefulMap(() => "opening resource")((_, _) => throw TE("failing read"), + _ => { + if (closedCounter.incrementAndGet() == 1) { + throw TE("boom") + } + None + }) + .runWith(TestSink.probe[Int]) + + EventFilter[TE](occurrences = 1).intercept { + probe.request(1) + probe.expectError(TE("boom")) + } + closedCounter.get() should ===(1) + } + + "will not call `onComplete` twice if `onComplete` fail on upstream complete" in { + val closedCounter = new AtomicInteger(0) + val (pub, sub) = TestSource[Int]() + .statefulMap(() => "opening resource")((state, value) => (state, value), + _ => { + closedCounter.incrementAndGet() + throw TE("boom") + }) + .toMat(TestSink.probe[Int])(Keep.both) + .run() + + EventFilter[TE](occurrences = 1).intercept { + sub.request(1) + pub.sendNext(1) + sub.expectNext(1) + sub.request(1) + pub.sendComplete() + sub.expectError(TE("boom")) + } + closedCounter.get() shouldBe 1 + } } }