Skip to content

Commit

Permalink
Add X-Correlation-ID
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Feb 23, 2024
1 parent d2c01dd commit 20ac7e6
Showing 1 changed file with 52 additions and 26 deletions.
78 changes: 52 additions & 26 deletions src/main/scala/akkahttp/ReverseProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.{Failure, Success}
* HTTP reverse proxy PoC with:
* - weighted round robin load balancing
* - retry on 5xx
* - X-Correlation-ID (only for local)
*
* Setup local:
* HTTP client(s) --> ReverseProxy --> local target server(s)
Expand All @@ -34,7 +35,7 @@ import scala.util.{Failure, Success}
* HTTP client(s) --> ReverseProxy --> httpstat.us
*
* curl client:
* curl -H "Host: local" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/mypath
* curl -H "Host: local" -H "X-Correlation-ID: 1-1" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/mypath
* curl -H "Host: external" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/200
*
* wrk perf client:
Expand All @@ -49,44 +50,52 @@ object ReverseProxy extends App {
implicit val http = Http(system)

val circuitBreakers = new ConcurrentHashMap[String, CircuitBreaker]()
val counter = new AtomicInteger(0)

val proxyHost = "127.0.0.1"
val proxyPort = 8080

// HTTP client(s)
val targetHostLocal = "local"
val targetHostExternal = "external"

val counter = new AtomicInteger(0)

// HTTP client(s)
val clients = 1 to 2
clients.par.foreach(clientID => httpClient(clientID, proxyHost, proxyPort, targetHostLocal, 10))

def httpClient(clientId: Int, proxyHost: String, proxyPort: Int, targetHost: String, nbrOfRequests: Int) = {
def logResponse(response: HttpResponse) = {
val id = if (response.getHeader("X-Correlation-ID").isPresent) {
response.getHeader("X-Correlation-ID").get().value()
} else {
"N/A"
}
logger.info(s"Client: $clientId got response with id: $id and status: ${response.status.intValue()}")
}

Source(1 to nbrOfRequests)
.throttle(1, 1.seconds)
.wireTap(each => logger.info(s"Client: $clientId about to send $each/$nbrOfRequests request..."))
.mapAsync(1)(_ => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/").withHeaders(Seq(RawHeader("Host", targetHost)))))
.wireTap(response => logger.info(s"Client: $clientId got response: ${response.status.intValue()}"))
.wireTap(each => logger.info(s"Client: $clientId about to send request with id: $clientId-$each..."))
.mapAsync(1)(each => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/").withHeaders(Seq(RawHeader("Host", targetHost), RawHeader("X-Correlation-ID", s"$clientId-$each")))))
.wireTap(response => logResponse(response))
// https://nightlies.apache.org/pekko/docs/pekko-http/1.0/docs/implications-of-streaming-http-entity.html
.runForeach(response => response.discardEntityBytes())
}

// ReverseProxy
def NotFound(path: String) = HttpResponse(
def NotFound(id: String = "N/A", path: String) = HttpResponse(
404,
entity = HttpEntity(ContentTypes.`application/json`, Json.obj("error" -> Json.fromString(s"$path not found")).noSpaces)
)
).withHeaders(Seq(RawHeader("X-Correlation-ID", id)))

def GatewayTimeout() = HttpResponse(
def GatewayTimeout(id: String = "N/A") = HttpResponse(
504,
entity = HttpEntity(ContentTypes.`application/json`, Json.obj("error" -> Json.fromString(s"Target servers timeout")).noSpaces)
)
).withHeaders(Seq(RawHeader("X-Correlation-ID", id)))

def BadGateway(message: String) = HttpResponse(
def BadGateway(id: String = "N/A", message: String) = HttpResponse(
502,
entity = HttpEntity(ContentTypes.`application/json`, Json.obj("error" -> Json.fromString(message)).noSpaces)
)
).withHeaders(Seq(RawHeader("X-Correlation-ID", id)))

val services: Map[String, Seq[Target]] = Map(
targetHostLocal -> Seq(
Expand All @@ -101,17 +110,22 @@ object ReverseProxy extends App {
)
)

def getHost(request: HttpRequest): String = request.header[Host].map(_.host.address()).getOrElse("--")

def handlerWithCircuitBreaker(request: HttpRequest): Future[HttpResponse] = {
val host = getHost(request)
val host = request.header[Host].map(_.host.address()).getOrElse("N/A")
val id = if (request.getHeader("X-Correlation-ID").isPresent) {
request.getHeader("X-Correlation-ID").get().value()
} else {
"N/A"
}

def headers(target: Target) = {
val headersIn: Seq[HttpHeader] =
request.headers.filterNot(t => t.name() == "Host") :+
Host(target.host, target.port) :+
RawHeader("X-Fowarded-Host", host) :+
RawHeader("X-Fowarded-Scheme", request.uri.scheme)
RawHeader("X-Fowarded-Scheme", request.uri.scheme) :+
RawHeader("X-Correlation-ID", id)

// Filter, to avoid log noise, see: https://github.com/akka/akka-http/issues/64
val filteredHeaders = headersIn.toList.filterNot(each => each.name() == "Timeout-Access")
filteredHeaders
Expand All @@ -132,17 +146,18 @@ object ReverseProxy extends App {
val target = seq.apply(index)
val circuitBreaker = circuitBreakers.computeIfAbsent(target.url, _ => new CircuitBreaker(
system.scheduler,
// TODO 'maxFailures' vs 'times' above
maxFailures = 5,
callTimeout = 30.seconds,
resetTimeout = 10.seconds))
val proxyReq = request.withUri(uri(target)).withHeaders(headers(target))
circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq))
}.recover {
case _: akka.pattern.CircuitBreakerOpenException => BadGateway("Circuit breaker opened")
case _: TimeoutException => GatewayTimeout()
case e => BadGateway(e.getMessage)
case _: akka.pattern.CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened")
case _: TimeoutException => GatewayTimeout(id)
case e => BadGateway(id, e.getMessage)
}
case None => Future.successful(NotFound(host))
case None => Future.successful(NotFound(id, host))
}
}

Expand All @@ -158,14 +173,20 @@ object ReverseProxy extends App {

// Local target servers (with faulty behaviour)
val echoRoute: Route =
extractUri { uri =>
extractRequest { request =>

complete {
val id = if (request.getHeader("X-Correlation-ID").isPresent) {
request.getHeader("X-Correlation-ID").get().value()
} else {
"N/A"
}

// Adjust to provoke more retries on ReverseProxy
val codes = List(200, 200, 200, 500, 500, 500)
val randomResponse = codes(new scala.util.Random().nextInt(codes.length))
logger.info(s"" +
s"Target server listening on: ${uri.authority.host}:${uri.effectivePort} got echo request, reply with: $randomResponse")
StatusCode.int2StatusCode(randomResponse)
logger.info(s"Target server listening on: ${request.uri.authority.host}:${request.uri.effectivePort} got echo request with id: $id, reply with: $randomResponse")
(StatusCode.int2StatusCode(randomResponse), Seq(RawHeader("X-Correlation-ID", id)))
}
}

Expand Down Expand Up @@ -224,8 +245,13 @@ object Retry {
case Success(t) =>
// we need to cast here
val code = t.asInstanceOf[HttpResponse].status.intValue()
val id = if (t.asInstanceOf[HttpResponse].getHeader("X-Correlation-ID").isPresent) {
t.asInstanceOf[HttpResponse].getHeader("X-Correlation-ID").get().value()
} else {
"N/A"
}
if (code >= 500) {
logger.info(s"Got 5xx server error from target server. Retries left: ${times - 1}")
logger.info(s"Got 5xx server error from target server for id: $id. Retries left: ${times - 1}")
retryPromise[T](times - 1, promise, Some(new RuntimeException("Got 500")), f)
} else {
promise.trySuccess(t)
Expand Down

0 comments on commit 20ac7e6

Please sign in to comment.