Skip to content

Commit

Permalink
Enable & fix failingPipe web socket tests for zio-http & vertx (#4243)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Jan 9, 2025
1 parent 41757ee commit b743f99
Show file tree
Hide file tree
Showing 16 changed files with 69 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class AkkaHttpServerTest extends TestSuite with EitherValues {
}
},
Test("execute metrics interceptors for empty body and json content type") {
val e = endpoint.post.in(stringBody)
val e = endpoint.post
.in(stringBody)
.out(stringBody)
.out(header(Header.contentType(MediaType.ApplicationJson)))
.serverLogicSuccess[Future](body => Future.successful(body))
Expand All @@ -175,18 +176,12 @@ class AkkaHttpServerTest extends TestSuite with EitherValues {
me.eval {
metric.onRequestCnt.incrementAndGet()
EndpointMetric(
onEndpointRequest = Some((_) =>
me.eval(metric.onEndpointRequestCnt.incrementAndGet()),
),
onResponseHeaders = Some((_, _) =>
me.eval(metric.onResponseHeadersCnt.incrementAndGet()),
),
onResponseBody = Some((_, _) =>
me.eval(metric.onResponseBodyCnt.incrementAndGet()),
),
onException = None,
onEndpointRequest = Some((_) => me.eval(metric.onEndpointRequestCnt.incrementAndGet())),
onResponseHeaders = Some((_, _) => me.eval(metric.onResponseHeadersCnt.incrementAndGet())),
onResponseBody = Some((_, _) => me.eval(metric.onResponseBodyCnt.incrementAndGet())),
onException = None
)
},
}
)
val route = AkkaHttpServerInterpreter(
AkkaHttpServerOptions.customiseInterceptors
Expand Down Expand Up @@ -217,7 +212,6 @@ class AkkaHttpServerTest extends TestSuite with EitherValues {
createServerTest,
AkkaStreams,
autoPing = false,
failingPipe = true,
handlePong = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi
createServerTest,
Fs2Streams[IO],
autoPing = true,
failingPipe = true,
handlePong = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class ZHttp4sServerTest extends TestSuite with OptionValues {
createServerTest,
ZioStreams,
autoPing = true,
failingPipe = false,
handlePong = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class NettyCatsServerTest extends TestSuite with EitherValues {
createServerTest,
Fs2Streams[IO],
autoPing = true,
failingPipe = true,
handlePong = true
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class NettySyncServerTest extends AsyncFunSuite with BeforeAndAfterAll {
new AllServerTests(createServerTest, interpreter, backend, staticContent = false, multipart = false)
.tests() ++
new ServerGracefulShutdownTests(createServerTest, sleeper).tests() ++
new ServerWebSocketTests(createServerTest, OxStreams, autoPing = true, failingPipe = true, handlePong = true) {
new ServerWebSocketTests(createServerTest, OxStreams, autoPing = true, handlePong = true) {
override def functionToPipe[A, B](f: A => B): OxStreams.Pipe[A, B] = _.map(f)
override def emptyPipe[A, B]: OxStreams.Pipe[A, B] = _ => Flow.empty
}.tests() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class PekkoHttpServerTest extends TestSuite with EitherValues {
.unsafeToFuture()
},
Test("execute metrics interceptors for empty body and json content type") {
val e = endpoint.post.in(stringBody)
val e = endpoint.post
.in(stringBody)
.out(stringBody)
.out(header(Header.contentType(MediaType.ApplicationJson)))
.serverLogicSuccess[Future](body => Future.successful(body))
Expand All @@ -124,18 +125,12 @@ class PekkoHttpServerTest extends TestSuite with EitherValues {
me.eval {
metric.onRequestCnt.incrementAndGet()
EndpointMetric(
onEndpointRequest = Some((_) =>
me.eval(metric.onEndpointRequestCnt.incrementAndGet()),
),
onResponseHeaders = Some((_, _) =>
me.eval(metric.onResponseHeadersCnt.incrementAndGet()),
),
onResponseBody = Some((_, _) =>
me.eval(metric.onResponseBodyCnt.incrementAndGet()),
),
onException = None,
onEndpointRequest = Some((_) => me.eval(metric.onEndpointRequestCnt.incrementAndGet())),
onResponseHeaders = Some((_, _) => me.eval(metric.onResponseHeadersCnt.incrementAndGet())),
onResponseBody = Some((_, _) => me.eval(metric.onResponseBodyCnt.incrementAndGet())),
onException = None
)
},
}
)
val route = PekkoHttpServerInterpreter(
PekkoHttpServerOptions.customiseInterceptors
Expand Down Expand Up @@ -166,7 +161,6 @@ class PekkoHttpServerTest extends TestSuite with EitherValues {
createServerTest,
PekkoStreams,
autoPing = false,
failingPipe = true,
handlePong = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ class PlayServerTest extends TestSuite {
createServerTest,
PekkoStreams,
autoPing = false,
failingPipe = true,
handlePong = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class PlayServerTest extends TestSuite {
createServerTest,
AkkaStreams,
autoPing = false,
failingPipe = true,
handlePong = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
createServerTest: CreateServerTest[F, S with WebSockets, OPTIONS, ROUTE],
val streams: S,
autoPing: Boolean,
failingPipe: Boolean,
handlePong: Boolean,
// Disabled for eaxmple for vert.x, which sometimes drops connection without returning Close
expectCloseResponse: Boolean = true,
Expand Down Expand Up @@ -281,41 +280,40 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
else List.empty

// Optional, because some backends don't handle exceptions in the pipe gracefully, they just swallow it silently and hang forever
val failingPipeTests =
if (failingPipe)
List(
testServer(
endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)),
"failing pipe"
)((_: Unit) =>
pureResult(functionToPipe[String, String] {
case "error-trigger" => throw new Exception("Boom!")
case msg => s"echo: $msg"
}.asRight[Unit])
) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- ws.sendText("test2")
_ <- ws.sendText("error-trigger")
m1 <- ws.eitherClose(ws.receiveText())
m2 <- ws.eitherClose(ws.receiveText())
m3 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m2, m3)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map { r =>
val results = r.body.map(_.map(_.left.map(_.statusCode))).value
results.take(2) shouldBe
List(Right("echo: test1"), Right("echo: test2"))
val closeCode = results.last.left.value
assert(closeCode == 1000 || closeCode == 1011) // some servers respond with Close(normal), some with Close(error)
}
val failingPipeTests = List(
testServer(
endpoint.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)),
"failing pipe"
)((_: Unit) =>
pureResult(functionToPipe[String, String] {
case "error-trigger" => throw new Exception("Boom!")
case msg => s"echo: $msg"
}.asRight[Unit])
) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- ws.sendText("test2")
_ <- ws.sendText("error-trigger")
m1 <- ws.eitherClose(ws.receiveText())
m2 <- ws.eitherClose(ws.receiveText())
m3 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m2, m3)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map { r =>
val results = r.body.map(_.map(_.left.map(_.statusCode))).value
results.take(2) shouldBe
List(Right("echo: test1"), Right("echo: test2"))
val closeCode = results.last.left.value
assert(
closeCode == 1000 || closeCode == 1006 || closeCode == 1011
) // some servers respond with Close(normal), some with Close(error), and zio+http4s with Close(abnormal)
}
)
else List.empty
}
)

val frameConcatenationTests =
if (frameConcatenation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class CatsVertxServerTest extends TestSuite {
createServerTest,
Fs2Streams.apply[IO],
autoPing = false,
failingPipe = false,
handlePong = true,
expectCloseResponse = false,
frameConcatenation = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ object Pipe {
})
request.endHandler { _ =>
val state = progress.updateAndGet(_.copy(completed = true))
if (state.inProgress == 0) socket.end()
if (state.inProgress == 0) { val _ = socket.close(1011.toShort) }
()
}
request.exceptionHandler { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package object streams {
pipe: streams.Pipe[REQ, RESP],
o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, VertxStreams]
): ReadStream[WebSocketFrame] = {
val stream0 = optionallyContatenateFrames(readStream, o.concatenateFragmentedFrames)
val stream0 = optionallyConcatenateFrames(readStream, o.concatenateFragmentedFrames)
val stream1 = optionallyIgnorePong(stream0, o.ignorePong)
val stream2 = optionallyAutoPing(stream1, o.autoPing)

Expand All @@ -45,7 +45,7 @@ package object streams {
}
}

private def optionallyContatenateFrames(rs: ReadStream[WebSocketFrame], doConcatenate: Boolean): ReadStream[WebSocketFrame] = {
private def optionallyConcatenateFrames(rs: ReadStream[WebSocketFrame], doConcatenate: Boolean): ReadStream[WebSocketFrame] = {
// TODO implement this
rs
}
Expand All @@ -68,15 +68,22 @@ package object streams {
* ReadStream doesn't offer a `map` function
*/
class ReadStreamMapping[A, B](source: ReadStream[A], mapping: A => B) extends ReadStream[B] {
private var exceptionHandler: Handler[Throwable] = null
override def handler(handler: Handler[B]): ReadStream[B] = {
if (handler == null) {
source.handler(null)
} else {
source.handler(event => handler.handle(mapping.apply(event)))
source.handler { event =>
try handler.handle(mapping.apply(event))
catch {
case e: Throwable => exceptionHandler.handle(e)
}
}
}
this
}
override def exceptionHandler(handler: Handler[Throwable]): ReadStream[B] = {
this.exceptionHandler = handler
source.exceptionHandler(handler)
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class VertxServerTest extends TestSuite {
createServerTest,
VertxStreams,
autoPing = false,
failingPipe = false,
handlePong = false,
expectCloseResponse = false,
frameConcatenation = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class ZioVertxServerTest extends TestSuite with OptionValues {
createServerTest,
ZioStreams,
autoPing = true,
failingPipe = false,
handlePong = false,
expectCloseResponse = false,
frameConcatenation = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,16 @@ trait ZioHttpInterpreter[R] {
channelEventsQueue <- zio.Queue.unbounded[WebSocketChannelEvent]
messageReceptionFiber <- channel.receiveAll { message => channelEventsQueue.offer(message) }.fork
webSocketStream <- webSocketHandler(stream.ZStream.fromQueue(channelEventsQueue))
_ <- webSocketStream.mapZIO(channel.send).runDrain
_ <- webSocketStream
.mapZIO(channel.send)
.runDrain
.resurrect
.catchAll { e =>
channel.send(ChannelEvent.Read(WebSocketFrame.Close(1011, Some("Internal server error")))) *> ZIO.logErrorCause(
"Exception when handling a WebSocket",
Cause.fail(e)
)
}
} yield messageReceptionFiber.join
}
webSocketConfig.fold(app)(app.withConfig).toResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ class ZioHttpServerTest extends TestSuite {
createServerTest,
ZioStreams,
autoPing = true,
failingPipe = false,
handlePong = false,
frameConcatenation = false
) {
Expand Down

0 comments on commit b743f99

Please sign in to comment.