Skip to content

Commit

Permalink
Add external target server and beautify
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Feb 14, 2024
1 parent 0d22ae3 commit 1e477da
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions src/main/scala/akkahttp/ReverseProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.pekko.http.scaladsl.model.headers.{Host, RawHeader}
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.pattern.CircuitBreaker
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.stream.scaladsl.Source
import org.slf4j.{Logger, LoggerFactory}

import java.util.concurrent.ConcurrentHashMap
Expand All @@ -20,18 +20,24 @@ import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
import scala.util.{Failure, Success}

/**
* HTTP reverse proxy with:
* Inspired by:
* https://github.com/mathieuancelin/akka-http-reverse-proxy
*
* HTTP reverse proxy PoC with:
* - weighted round robin load balancing
* - retry
* - retry on 5xx
*
* Setup:
* HTTP client(s) --> ReverseProxy --> Target server(s)
* HTTP client(s) --> ReverseProxy --> target server(s)
*
* Inspired by:
* https://github.com/mathieuancelin/akka-http-reverse-proxy
*
* curl:
* curl -H "Host: local" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080
* curl client:
* curl -H "Host: local" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/mypath
* curl -H "Host: external" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/200
* wrk perf client:
* wrk -t2 -c10 -d10s -H "Host: external" --latency http://127.0.0.1:8080/200
*/
object ReverseProxy extends App {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
Expand All @@ -47,21 +53,22 @@ object ReverseProxy extends App {
val proxyPort = 8080

val targetHostLocal = "local"
val targetHostExternal = "external"

val counter = new AtomicInteger(0)

// HTTP client(s)
val clients = 1 to 2
clients.par.foreach(clientID => httpClient(clientID, proxyHost, proxyPort))
clients.par.foreach(clientID => httpClient(clientID, proxyHost, proxyPort, targetHostLocal, 10))

def httpClient(clientId: Int, proxyHost: String, proxyPort: Int) = {
val nbrOfRequests = 10
def httpClient(clientId: Int, proxyHost: String, proxyPort: Int, targetHost: String, nbrOfRequests: Int) = {
Source(1 to nbrOfRequests)
.throttle(1, 1.seconds)
.wireTap(each => logger.info(s"Client: $clientId about to issue $each/$nbrOfRequests request..."))
.mapAsync(1)(_ => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/").withHeaders(Seq(RawHeader("Host", targetHostLocal)))))
.wireTap(each => logger.info(s"Client: $clientId about to send $each/$nbrOfRequests request..."))
.mapAsync(1)(_ => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/").withHeaders(Seq(RawHeader("Host", targetHost)))))
.wireTap(response => logger.info(s"Client: $clientId got response: ${response.status.intValue()}"))
.runWith(Sink.ignore)
// https://nightlies.apache.org/pekko/docs/pekko-http/1.0/docs/implications-of-streaming-http-entity.html
.runForeach(response => response.discardEntityBytes())
}

// ReverseProxy
Expand All @@ -82,9 +89,14 @@ object ReverseProxy extends App {

val services: Map[String, Seq[Target]] = Map(
targetHostLocal -> Seq(
Target.weighted("http://127.0.0.1:9081", 3),
Target.weighted("http://127.0.0.1:9081", 1),
Target.weighted("http://127.0.0.1:9082", 2),
Target.weighted("http://127.0.0.1:9083", 1)
Target.weighted("http://127.0.0.1:9083", 3)
),
targetHostExternal -> Seq(
Target.weighted("https://httpstat.us:443", 1),
Target.weighted("https://httpstat.us:443", 2),
Target.weighted("https://httpstat.us:443", 3)
)
)

Expand All @@ -108,11 +120,12 @@ object ReverseProxy extends App {
Host(target.host) :+
RawHeader("X-Fowarded-Host", host) :+
RawHeader("X-Fowarded-Scheme", request.uri.scheme)
val uri = request.uri.copy(
val uri: Uri = request.uri.copy(
scheme = target.scheme,
authority = Authority(host = Uri.NamedHost(target.host), port = target.port))
// Filter, to avoid log noise, see: https://github.com/akka/akka-http/issues/64
val proxyReq = request.withUri(uri).withHeaders(headersIn.toList.filterNot(each => each.name() == "Timeout-Access"))
val filteredHeaders = headersIn.toList.filterNot(each => each.name() == "Timeout-Access")
val proxyReq = request.withUri(uri).withHeaders(filteredHeaders)
circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq))
}.recover {
case _: akka.pattern.CircuitBreakerOpenException => BadGateway("Circuit breaker opened")
Expand All @@ -133,16 +146,18 @@ object ReverseProxy extends App {
system.terminate()
}

// Local Target servers simulation
// Local target servers (with faulty behaviour)
val echoRoute: Route =
path("")(
get {
extractUri { uri =>
complete {
// Adjust to provoke more retries on ReverseProxy
val codes = List(200, 200, 200, 500, 500, 500)
val randomResponse = codes(new scala.util.Random().nextInt(codes.length))
logger.info(s"Local target server got echo request, reply with: $randomResponse")
complete(StatusCode.int2StatusCode(randomResponse))
})
// TODO Why is the port 0?
logger.info(s"Local target server listening on: ${uri.authority.port} got echo request, reply with: $randomResponse")
StatusCode.int2StatusCode(randomResponse)
}
}

services.get(targetHostLocal).foreach(targetSeq =>
targetSeq.foreach(target => {
Expand Down Expand Up @@ -200,7 +215,7 @@ object Retry {
// we need to cast here
val code = t.asInstanceOf[HttpResponse].status.intValue()
if (code >= 500) {
logger.info(s"Got 50x server error from target server on attempt: $times/3")
logger.info(s"Got 5xx server error from target server on attempt: $times/3")
retryPromise[T](times - 1, promise, Some(new RuntimeException("Got 500")), f)
} else {
promise.trySuccess(t)
Expand Down

0 comments on commit 1e477da

Please sign in to comment.