From b743f992bcdabd449ecb2fa6ea09aebec57a7d9f Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Thu, 9 Jan 2025 20:27:58 +0100 Subject: [PATCH] Enable & fix failingPipe web socket tests for zio-http & vertx (#4243) --- .../server/akkahttp/AkkaHttpServerTest.scala | 20 ++---- .../server/http4s/Http4sServerTest.scala | 1 - .../http4s/ztapir/ZHttp4sServerTest.scala | 1 - .../netty/cats/NettyCatsServerTest.scala | 1 - .../netty/sync/NettySyncServerTest.scala | 2 +- .../pekkohttp/PekkoHttpServerTest.scala | 20 ++---- .../tapir/server/play/PlayServerTest.scala | 1 - .../tapir/server/play/PlayServerTest.scala | 1 - .../server/tests/ServerWebSocketTests.scala | 68 +++++++++---------- .../vertx/cats/CatsVertxServerTest.scala | 1 - .../tapir/server/vertx/streams/Pipe.scala | 2 +- .../tapir/server/vertx/streams/package.scala | 13 +++- .../tapir/server/vertx/VertxServerTest.scala | 1 - .../server/vertx/zio/ZioVertxServerTest.scala | 1 - .../server/ziohttp/ZioHttpInterpreter.scala | 11 ++- .../server/ziohttp/ZioHttpServerTest.scala | 1 - 16 files changed, 69 insertions(+), 76 deletions(-) diff --git a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala index b603e61cb8..97768ec461 100644 --- a/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala +++ b/server/akka-http-server/src/test/scala/sttp/tapir/server/akkahttp/AkkaHttpServerTest.scala @@ -156,7 +156,8 @@ class AkkaHttpServerTest extends TestSuite with EitherValues { } }, Test("execute metrics interceptors for empty body and json content type") { - val e = endpoint.post.in(stringBody) + val e = endpoint.post + .in(stringBody) .out(stringBody) .out(header(Header.contentType(MediaType.ApplicationJson))) .serverLogicSuccess[Future](body => Future.successful(body)) @@ -175,18 +176,12 @@ class AkkaHttpServerTest extends TestSuite with EitherValues { me.eval { metric.onRequestCnt.incrementAndGet() EndpointMetric( - onEndpointRequest = Some((_) => - me.eval(metric.onEndpointRequestCnt.incrementAndGet()), - ), - onResponseHeaders = Some((_, _) => - me.eval(metric.onResponseHeadersCnt.incrementAndGet()), - ), - onResponseBody = Some((_, _) => - me.eval(metric.onResponseBodyCnt.incrementAndGet()), - ), - onException = None, + onEndpointRequest = Some((_) => me.eval(metric.onEndpointRequestCnt.incrementAndGet())), + onResponseHeaders = Some((_, _) => me.eval(metric.onResponseHeadersCnt.incrementAndGet())), + onResponseBody = Some((_, _) => me.eval(metric.onResponseBodyCnt.incrementAndGet())), + onException = None ) - }, + } ) val route = AkkaHttpServerInterpreter( AkkaHttpServerOptions.customiseInterceptors @@ -217,7 +212,6 @@ class AkkaHttpServerTest extends TestSuite with EitherValues { createServerTest, AkkaStreams, autoPing = false, - failingPipe = true, handlePong = false ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) diff --git a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala index c89a8fe03b..1a474cbf73 100644 --- a/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala +++ b/server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala @@ -151,7 +151,6 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi createServerTest, Fs2Streams[IO], autoPing = true, - failingPipe = true, handlePong = false ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) diff --git a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala index 5aafe6caa7..fefb5cd8fc 100644 --- a/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala +++ b/server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sServerTest.scala @@ -59,7 +59,6 @@ class ZHttp4sServerTest extends TestSuite with OptionValues { createServerTest, ZioStreams, autoPing = true, - failingPipe = false, handlePong = false ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 73053ed362..d02cb96a3e 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -45,7 +45,6 @@ class NettyCatsServerTest extends TestSuite with EitherValues { createServerTest, Fs2Streams[IO], autoPing = true, - failingPipe = true, handlePong = true ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) diff --git a/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncServerTest.scala b/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncServerTest.scala index 19ded1bbe7..c21f563281 100644 --- a/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncServerTest.scala +++ b/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncServerTest.scala @@ -42,7 +42,7 @@ class NettySyncServerTest extends AsyncFunSuite with BeforeAndAfterAll { new AllServerTests(createServerTest, interpreter, backend, staticContent = false, multipart = false) .tests() ++ new ServerGracefulShutdownTests(createServerTest, sleeper).tests() ++ - new ServerWebSocketTests(createServerTest, OxStreams, autoPing = true, failingPipe = true, handlePong = true) { + new ServerWebSocketTests(createServerTest, OxStreams, autoPing = true, handlePong = true) { override def functionToPipe[A, B](f: A => B): OxStreams.Pipe[A, B] = _.map(f) override def emptyPipe[A, B]: OxStreams.Pipe[A, B] = _ => Flow.empty }.tests() ++ diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala index c3dfd25c28..f6b5d5c30c 100644 --- a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala @@ -105,7 +105,8 @@ class PekkoHttpServerTest extends TestSuite with EitherValues { .unsafeToFuture() }, Test("execute metrics interceptors for empty body and json content type") { - val e = endpoint.post.in(stringBody) + val e = endpoint.post + .in(stringBody) .out(stringBody) .out(header(Header.contentType(MediaType.ApplicationJson))) .serverLogicSuccess[Future](body => Future.successful(body)) @@ -124,18 +125,12 @@ class PekkoHttpServerTest extends TestSuite with EitherValues { me.eval { metric.onRequestCnt.incrementAndGet() EndpointMetric( - onEndpointRequest = Some((_) => - me.eval(metric.onEndpointRequestCnt.incrementAndGet()), - ), - onResponseHeaders = Some((_, _) => - me.eval(metric.onResponseHeadersCnt.incrementAndGet()), - ), - onResponseBody = Some((_, _) => - me.eval(metric.onResponseBodyCnt.incrementAndGet()), - ), - onException = None, + onEndpointRequest = Some((_) => me.eval(metric.onEndpointRequestCnt.incrementAndGet())), + onResponseHeaders = Some((_, _) => me.eval(metric.onResponseHeadersCnt.incrementAndGet())), + onResponseBody = Some((_, _) => me.eval(metric.onResponseBodyCnt.incrementAndGet())), + onException = None ) - }, + } ) val route = PekkoHttpServerInterpreter( PekkoHttpServerOptions.customiseInterceptors @@ -166,7 +161,6 @@ class PekkoHttpServerTest extends TestSuite with EitherValues { createServerTest, PekkoStreams, autoPing = false, - failingPipe = true, handlePong = false ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 7ef2dc432d..5fdf8188b3 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -126,7 +126,6 @@ class PlayServerTest extends TestSuite { createServerTest, PekkoStreams, autoPing = false, - failingPipe = true, handlePong = false ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) diff --git a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index c25bd41529..7d312c06ac 100644 --- a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -118,7 +118,6 @@ class PlayServerTest extends TestSuite { createServerTest, AkkaStreams, autoPing = false, - failingPipe = true, handlePong = false ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) diff --git a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala index 55b7df5162..1a22efe1a3 100644 --- a/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala +++ b/server/tests/src/main/scala/sttp/tapir/server/tests/ServerWebSocketTests.scala @@ -26,7 +26,6 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE]( createServerTest: CreateServerTest[F, S with WebSockets, OPTIONS, ROUTE], val streams: S, autoPing: Boolean, - failingPipe: Boolean, handlePong: Boolean, // Disabled for eaxmple for vert.x, which sometimes drops connection without returning Close expectCloseResponse: Boolean = true, @@ -281,41 +280,40 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE]( else List.empty // Optional, because some backends don't handle exceptions in the pipe gracefully, they just swallow it silently and hang forever - val failingPipeTests = - if (failingPipe) - List( - testServer( - endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)), - "failing pipe" - )((_: Unit) => - pureResult(functionToPipe[String, String] { - case "error-trigger" => throw new Exception("Boom!") - case msg => s"echo: $msg" - }.asRight[Unit]) - ) { (backend, baseUri) => - basicRequest - .response(asWebSocket { (ws: WebSocket[IO]) => - for { - _ <- ws.sendText("test1") - _ <- ws.sendText("test2") - _ <- ws.sendText("error-trigger") - m1 <- ws.eitherClose(ws.receiveText()) - m2 <- ws.eitherClose(ws.receiveText()) - m3 <- ws.eitherClose(ws.receiveText()) - } yield List(m1, m2, m3) - }) - .get(baseUri.scheme("ws")) - .send(backend) - .map { r => - val results = r.body.map(_.map(_.left.map(_.statusCode))).value - results.take(2) shouldBe - List(Right("echo: test1"), Right("echo: test2")) - val closeCode = results.last.left.value - assert(closeCode == 1000 || closeCode == 1011) // some servers respond with Close(normal), some with Close(error) - } + val failingPipeTests = List( + testServer( + endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)), + "failing pipe" + )((_: Unit) => + pureResult(functionToPipe[String, String] { + case "error-trigger" => throw new Exception("Boom!") + case msg => s"echo: $msg" + }.asRight[Unit]) + ) { (backend, baseUri) => + basicRequest + .response(asWebSocket { (ws: WebSocket[IO]) => + for { + _ <- ws.sendText("test1") + _ <- ws.sendText("test2") + _ <- ws.sendText("error-trigger") + m1 <- ws.eitherClose(ws.receiveText()) + m2 <- ws.eitherClose(ws.receiveText()) + m3 <- ws.eitherClose(ws.receiveText()) + } yield List(m1, m2, m3) + }) + .get(baseUri.scheme("ws")) + .send(backend) + .map { r => + val results = r.body.map(_.map(_.left.map(_.statusCode))).value + results.take(2) shouldBe + List(Right("echo: test1"), Right("echo: test2")) + val closeCode = results.last.left.value + assert( + closeCode == 1000 || closeCode == 1006 || closeCode == 1011 + ) // some servers respond with Close(normal), some with Close(error), and zio+http4s with Close(abnormal) } - ) - else List.empty + } + ) val frameConcatenationTests = if (frameConcatenation) diff --git a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala index 4044072ac4..b73fa92528 100644 --- a/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala +++ b/server/vertx-server/cats/src/test/scala/sttp/tapir/server/vertx/cats/CatsVertxServerTest.scala @@ -41,7 +41,6 @@ class CatsVertxServerTest extends TestSuite { createServerTest, Fs2Streams.apply[IO], autoPing = false, - failingPipe = false, handlePong = true, expectCloseResponse = false, frameConcatenation = false diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/Pipe.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/Pipe.scala index 1c1b1f184b..e78b009e3a 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/Pipe.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/Pipe.scala @@ -164,7 +164,7 @@ object Pipe { }) request.endHandler { _ => val state = progress.updateAndGet(_.copy(completed = true)) - if (state.inProgress == 0) socket.end() + if (state.inProgress == 0) { val _ = socket.close(1011.toShort) } () } request.exceptionHandler { _ => diff --git a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/package.scala b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/package.scala index 8165d15bcb..e597b8db87 100644 --- a/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/package.scala +++ b/server/vertx-server/src/main/scala/sttp/tapir/server/vertx/streams/package.scala @@ -26,7 +26,7 @@ package object streams { pipe: streams.Pipe[REQ, RESP], o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, VertxStreams] ): ReadStream[WebSocketFrame] = { - val stream0 = optionallyContatenateFrames(readStream, o.concatenateFragmentedFrames) + val stream0 = optionallyConcatenateFrames(readStream, o.concatenateFragmentedFrames) val stream1 = optionallyIgnorePong(stream0, o.ignorePong) val stream2 = optionallyAutoPing(stream1, o.autoPing) @@ -45,7 +45,7 @@ package object streams { } } - private def optionallyContatenateFrames(rs: ReadStream[WebSocketFrame], doConcatenate: Boolean): ReadStream[WebSocketFrame] = { + private def optionallyConcatenateFrames(rs: ReadStream[WebSocketFrame], doConcatenate: Boolean): ReadStream[WebSocketFrame] = { // TODO implement this rs } @@ -68,15 +68,22 @@ package object streams { * ReadStream doesn't offer a `map` function */ class ReadStreamMapping[A, B](source: ReadStream[A], mapping: A => B) extends ReadStream[B] { + private var exceptionHandler: Handler[Throwable] = null override def handler(handler: Handler[B]): ReadStream[B] = { if (handler == null) { source.handler(null) } else { - source.handler(event => handler.handle(mapping.apply(event))) + source.handler { event => + try handler.handle(mapping.apply(event)) + catch { + case e: Throwable => exceptionHandler.handle(e) + } + } } this } override def exceptionHandler(handler: Handler[Throwable]): ReadStream[B] = { + this.exceptionHandler = handler source.exceptionHandler(handler) this } diff --git a/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala b/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala index fca79709f5..082d9cf58e 100644 --- a/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala +++ b/server/vertx-server/src/test/scala/sttp/tapir/server/vertx/VertxServerTest.scala @@ -57,7 +57,6 @@ class VertxServerTest extends TestSuite { createServerTest, VertxStreams, autoPing = false, - failingPipe = false, handlePong = false, expectCloseResponse = false, frameConcatenation = false diff --git a/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala b/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala index 66c06f9c29..8f9cbedab3 100644 --- a/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala +++ b/server/vertx-server/zio/src/test/scala/sttp/tapir/server/vertx/zio/ZioVertxServerTest.scala @@ -47,7 +47,6 @@ class ZioVertxServerTest extends TestSuite with OptionValues { createServerTest, ZioStreams, autoPing = true, - failingPipe = false, handlePong = false, expectCloseResponse = false, frameConcatenation = false diff --git a/server/zio-http-server/src/main/scala/sttp/tapir/server/ziohttp/ZioHttpInterpreter.scala b/server/zio-http-server/src/main/scala/sttp/tapir/server/ziohttp/ZioHttpInterpreter.scala index e03ecf7c28..636c054dde 100644 --- a/server/zio-http-server/src/main/scala/sttp/tapir/server/ziohttp/ZioHttpInterpreter.scala +++ b/server/zio-http-server/src/main/scala/sttp/tapir/server/ziohttp/ZioHttpInterpreter.scala @@ -190,7 +190,16 @@ trait ZioHttpInterpreter[R] { channelEventsQueue <- zio.Queue.unbounded[WebSocketChannelEvent] messageReceptionFiber <- channel.receiveAll { message => channelEventsQueue.offer(message) }.fork webSocketStream <- webSocketHandler(stream.ZStream.fromQueue(channelEventsQueue)) - _ <- webSocketStream.mapZIO(channel.send).runDrain + _ <- webSocketStream + .mapZIO(channel.send) + .runDrain + .resurrect + .catchAll { e => + channel.send(ChannelEvent.Read(WebSocketFrame.Close(1011, Some("Internal server error")))) *> ZIO.logErrorCause( + "Exception when handling a WebSocket", + Cause.fail(e) + ) + } } yield messageReceptionFiber.join } webSocketConfig.fold(app)(app.withConfig).toResponse diff --git a/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala b/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala index eb64d68777..04e3beb805 100644 --- a/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala +++ b/server/zio-http-server/src/test/scala/sttp/tapir/server/ziohttp/ZioHttpServerTest.scala @@ -314,7 +314,6 @@ class ZioHttpServerTest extends TestSuite { createServerTest, ZioStreams, autoPing = true, - failingPipe = false, handlePong = false, frameConcatenation = false ) {