From 20ac7e66afb4e087bdf6d9af2714a3ea36134a85 Mon Sep 17 00:00:00 2001 From: pbernet Date: Fri, 23 Feb 2024 19:18:55 +0100 Subject: [PATCH] Add X-Correlation-ID --- src/main/scala/akkahttp/ReverseProxy.scala | 78 ++++++++++++++-------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/src/main/scala/akkahttp/ReverseProxy.scala b/src/main/scala/akkahttp/ReverseProxy.scala index 08169dbc..10248bd9 100644 --- a/src/main/scala/akkahttp/ReverseProxy.scala +++ b/src/main/scala/akkahttp/ReverseProxy.scala @@ -26,6 +26,7 @@ import scala.util.{Failure, Success} * HTTP reverse proxy PoC with: * - weighted round robin load balancing * - retry on 5xx + * - X-Correlation-ID (only for local) * * Setup local: * HTTP client(s) --> ReverseProxy --> local target server(s) @@ -34,7 +35,7 @@ import scala.util.{Failure, Success} * HTTP client(s) --> ReverseProxy --> httpstat.us * * curl client: - * curl -H "Host: local" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/mypath + * curl -H "Host: local" -H "X-Correlation-ID: 1-1" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/mypath * curl -H "Host: external" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/200 * * wrk perf client: @@ -49,44 +50,52 @@ object ReverseProxy extends App { implicit val http = Http(system) val circuitBreakers = new ConcurrentHashMap[String, CircuitBreaker]() + val counter = new AtomicInteger(0) val proxyHost = "127.0.0.1" val proxyPort = 8080 + // HTTP client(s) val targetHostLocal = "local" val targetHostExternal = "external" - val counter = new AtomicInteger(0) - - // HTTP client(s) val clients = 1 to 2 clients.par.foreach(clientID => httpClient(clientID, proxyHost, proxyPort, targetHostLocal, 10)) def httpClient(clientId: Int, proxyHost: String, proxyPort: Int, targetHost: String, nbrOfRequests: Int) = { + def logResponse(response: HttpResponse) = { + val id = if (response.getHeader("X-Correlation-ID").isPresent) { + response.getHeader("X-Correlation-ID").get().value() + } else { + "N/A" + } + logger.info(s"Client: $clientId got response with id: $id and status: ${response.status.intValue()}") + } + Source(1 to nbrOfRequests) .throttle(1, 1.seconds) - .wireTap(each => logger.info(s"Client: $clientId about to send $each/$nbrOfRequests request...")) - .mapAsync(1)(_ => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/").withHeaders(Seq(RawHeader("Host", targetHost))))) - .wireTap(response => logger.info(s"Client: $clientId got response: ${response.status.intValue()}")) + .wireTap(each => logger.info(s"Client: $clientId about to send request with id: $clientId-$each...")) + .mapAsync(1)(each => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/").withHeaders(Seq(RawHeader("Host", targetHost), RawHeader("X-Correlation-ID", s"$clientId-$each"))))) + .wireTap(response => logResponse(response)) // https://nightlies.apache.org/pekko/docs/pekko-http/1.0/docs/implications-of-streaming-http-entity.html .runForeach(response => response.discardEntityBytes()) } // ReverseProxy - def NotFound(path: String) = HttpResponse( + def NotFound(id: String = "N/A", path: String) = HttpResponse( 404, entity = HttpEntity(ContentTypes.`application/json`, Json.obj("error" -> Json.fromString(s"$path not found")).noSpaces) - ) + ).withHeaders(Seq(RawHeader("X-Correlation-ID", id))) - def GatewayTimeout() = HttpResponse( + def GatewayTimeout(id: String = "N/A") = HttpResponse( 504, entity = HttpEntity(ContentTypes.`application/json`, Json.obj("error" -> Json.fromString(s"Target servers timeout")).noSpaces) - ) + ).withHeaders(Seq(RawHeader("X-Correlation-ID", id))) - def BadGateway(message: String) = HttpResponse( + def BadGateway(id: String = "N/A", message: String) = HttpResponse( 502, entity = HttpEntity(ContentTypes.`application/json`, Json.obj("error" -> Json.fromString(message)).noSpaces) - ) + ).withHeaders(Seq(RawHeader("X-Correlation-ID", id))) val services: Map[String, Seq[Target]] = Map( targetHostLocal -> Seq( @@ -101,17 +110,22 @@ object ReverseProxy extends App { ) ) - def getHost(request: HttpRequest): String = request.header[Host].map(_.host.address()).getOrElse("--") - def handlerWithCircuitBreaker(request: HttpRequest): Future[HttpResponse] = { - val host = getHost(request) + val host = request.header[Host].map(_.host.address()).getOrElse("N/A") + val id = if (request.getHeader("X-Correlation-ID").isPresent) { + request.getHeader("X-Correlation-ID").get().value() + } else { + "N/A" + } def headers(target: Target) = { val headersIn: Seq[HttpHeader] = request.headers.filterNot(t => t.name() == "Host") :+ Host(target.host, target.port) :+ RawHeader("X-Fowarded-Host", host) :+ - RawHeader("X-Fowarded-Scheme", request.uri.scheme) + RawHeader("X-Fowarded-Scheme", request.uri.scheme) :+ + RawHeader("X-Correlation-ID", id) + // Filter, to avoid log noise, see: https://github.com/akka/akka-http/issues/64 val filteredHeaders = headersIn.toList.filterNot(each => each.name() == "Timeout-Access") filteredHeaders @@ -132,17 +146,18 @@ object ReverseProxy extends App { val target = seq.apply(index) val circuitBreaker = circuitBreakers.computeIfAbsent(target.url, _ => new CircuitBreaker( system.scheduler, + // TODO 'maxFailures' vs 'times' above maxFailures = 5, callTimeout = 30.seconds, resetTimeout = 10.seconds)) val proxyReq = request.withUri(uri(target)).withHeaders(headers(target)) circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq)) }.recover { - case _: akka.pattern.CircuitBreakerOpenException => BadGateway("Circuit breaker opened") - case _: TimeoutException => GatewayTimeout() - case e => BadGateway(e.getMessage) + case _: akka.pattern.CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened") + case _: TimeoutException => GatewayTimeout(id) + case e => BadGateway(id, e.getMessage) } - case None => Future.successful(NotFound(host)) + case None => Future.successful(NotFound(id, host)) } } @@ -158,14 +173,20 @@ object ReverseProxy extends App { // Local target servers (with faulty behaviour) val echoRoute: Route = - extractUri { uri => + extractRequest { request => + complete { + val id = if (request.getHeader("X-Correlation-ID").isPresent) { + request.getHeader("X-Correlation-ID").get().value() + } else { + "N/A" + } + // Adjust to provoke more retries on ReverseProxy val codes = List(200, 200, 200, 500, 500, 500) val randomResponse = codes(new scala.util.Random().nextInt(codes.length)) - logger.info(s"" + - s"Target server listening on: ${uri.authority.host}:${uri.effectivePort} got echo request, reply with: $randomResponse") - StatusCode.int2StatusCode(randomResponse) + logger.info(s"Target server listening on: ${request.uri.authority.host}:${request.uri.effectivePort} got echo request with id: $id, reply with: $randomResponse") + (StatusCode.int2StatusCode(randomResponse), Seq(RawHeader("X-Correlation-ID", id))) } } @@ -224,8 +245,13 @@ object Retry { case Success(t) => // we need to cast here val code = t.asInstanceOf[HttpResponse].status.intValue() + val id = if (t.asInstanceOf[HttpResponse].getHeader("X-Correlation-ID").isPresent) { + t.asInstanceOf[HttpResponse].getHeader("X-Correlation-ID").get().value() + } else { + "N/A" + } if (code >= 500) { - logger.info(s"Got 5xx server error from target server. Retries left: ${times - 1}") + logger.info(s"Got 5xx server error from target server for id: $id. Retries left: ${times - 1}") retryPromise[T](times - 1, promise, Some(new RuntimeException("Got 500")), f) } else { promise.trySuccess(t)