Skip to content

Commit

Permalink
Beautify
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Jun 7, 2024
1 parent f9cc5e1 commit 0d6731e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ The class [SSEtoElasticsearch](src/main/scala/alpakka/sse_to_elasticsearch/SSEto
workflow, using the `title` attribute as identifier from the SSE entity to fetch the `extract` from the Wikipedia API,
eg
for [Douglas Adams](https://en.wikipedia.org/w/api.php?format=json&action=query&prop=extracts&exlimit=max&explaintext&exintro&titles=Douglas_Adams).
Text processing on this content using [opennlp](https://opennlp.apache.org/docs/1.9.3/manual/opennlp.html)
Text processing on this content using [opennlp](https://opennlp.apache.org/docs/2.3.3/manual/opennlp.html)
yields `personsFound`, which are added to the `wikipediaedits` Elasticsearch index.
The index is queried periodically and the content may also be viewed with a Browser, eg

Expand Down
44 changes: 26 additions & 18 deletions src/main/scala/akkahttp/ReverseProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ import scala.util.{Failure, Success}
* Inspired by:
* https://github.com/mathieuancelin/akka-http-reverse-proxy
*
* HTTP reverse proxy echo PoC with:
* - weighted round robin load balancing
* - retry on 5xx
* - CircuitBreaker per target servers to avoid overload
* - HTTP Header X-Correlation-ID for tracing (only for Mode.local)
* HTTP reverse proxy server echo PoC with:
* - Weighted round robin load balancing
* - Retry on 5xx
* - CircuitBreaker per target server to avoid overload
* - HTTP Header `X-Correlation-ID` for tracing (only for Mode.local)
*
* Mode.local:
* HTTP client(s) --> ReverseProxy --> local target server(s)
*
* Mode.external:
* HTTP client(s) --> ReverseProxy --> https://httpstat.us:443
* Mode.remote:
* HTTP client(s) --> ReverseProxy --> remote target server(s)
*
* curl client:
* curl -H "Host: local" -H "X-Correlation-ID: 1-1" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/mypath
* curl -H "Host: external" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/200
* curl -H "Host: remote" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/200
*
* wrk perf client:
* wrk -t2 -c10 -d10s -H "Host: local" --latency http://127.0.0.1:8080/mypath
* wrk -t2 -c10 -d10s -H "Host: external" --latency http://127.0.0.1:8080/200
* wrk -t2 -c10 -d10s -H "Host: remote" --latency http://127.0.0.1:8080/200
*
* This conceptual PoC works but may not scale well, possible bottlenecks:
* - Combination of Retry/CircuitBreaker
Expand Down Expand Up @@ -76,20 +76,21 @@ object ReverseProxy extends App {
Target.weighted("http://127.0.0.1:9082", 2),
Target.weighted("http://127.0.0.1:9083", 3)
),
Mode.external -> Seq(
Mode.remote -> Seq(
Target.weighted("https://httpstat.us:443", 1),
Target.weighted("https://httpstat.us:443", 2),
Target.weighted("https://httpstat.us:443", 3)
)
)

targetServers(maxConnections = 5) // 1-1024
localTargetServers(maxConnections = 5) // 1-1024
reverseProxy()
// Switch here to force ReverseProxy to forward requests to local or remote target server(s)
clients(nbrOfClients = 10, requestsPerClient = 10, Mode.local)

// HTTP client(s)
def clients(nbrOfClients: Int = 1, requestsPerClient: Int = 1, mode: Mode): Unit = {
logger.info(s"Running $nbrOfClients clients, each sending $requestsPerClient requests")
logger.info(s"Running $nbrOfClients client(s), each sending $requestsPerClient requests")
val clients = 1 to nbrOfClients
clients.par.foreach(clientID => httpClient(clientID, proxyHost, proxyPort, mode, requestsPerClient))

Expand All @@ -100,16 +101,22 @@ object ReverseProxy extends App {
msg.onComplete(msg => logger.info(s"Client: $clientId got response: ${response.status.intValue()} for id: $id and msg: ${msg.getOrElse("N/A")}"))
}

val fixedPath = mode match {
case Mode.local => ""
case Mode.remote => "random/200,201,500-504"
}

Source(1 to nbrOfRequests)
.throttle(1, 1.second, 10, ThrottleMode.shaping)
.wireTap(each => logger.info(s"Client: $clientId about to send request with id: $clientId-$each..."))
.mapAsync(1)(each => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/").withHeaders(Seq(RawHeader("Host", targetHost.toString), RawHeader("X-Correlation-ID", s"$clientId-$each")))))
.mapAsync(1)(each => http.singleRequest(HttpRequest(uri = s"http://$proxyHost:$proxyPort/$fixedPath")
.withHeaders(Seq(RawHeader("Host", targetHost.toString), RawHeader("X-Correlation-ID", s"$clientId-$each")))))
.wireTap(response => logResponse(response))
.runWith(Sink.ignore)
}
}

// ReverseProxy
// ReverseProxy server
def reverseProxy(): Unit = {
def NotFound(id: String, path: String) = HttpResponse(
404,
Expand Down Expand Up @@ -157,6 +164,7 @@ object ReverseProxy extends App {
Retry.retry[HttpResponse](times = 3) {
val index = requestCounter.incrementAndGet() % (if (seq.isEmpty) 1 else seq.size)
val target = seq(index)
logger.info(s"Forwarding request with id: $id to $mode target server: ${target.url}")
val circuitBreaker = circuitBreakers.computeIfAbsent(target.url, _ => new CircuitBreaker(
system.scheduler,
// A low value opens the circuit breaker for subsequent requests (until resetTimeout)
Expand Down Expand Up @@ -188,7 +196,7 @@ object ReverseProxy extends App {
}

// Local target servers (with faulty behaviour and throttled)
def targetServers(maxConnections: Int): Unit = {
def localTargetServers(maxConnections: Int): Unit = {
val echoRoute: Route =
extractRequest { request =>
complete {
Expand Down Expand Up @@ -217,9 +225,9 @@ object ReverseProxy extends App {

futTargetServer.onComplete {
case Success(b) =>
logger.info(s"Target server started, listening on: ${b.localAddress}")
logger.info(s"Local target server started, listening on: ${b.localAddress}")
case Failure(e) =>
logger.info(s"Target server could not bind to... Exception message: ${e.getMessage}")
logger.info(s"Local target server could not bind to... Exception message: ${e.getMessage}")
system.terminate()
}
}
Expand Down Expand Up @@ -248,7 +256,7 @@ object ReverseProxy extends App {

object Mode extends Enumeration {
type Mode = Value
val local, external = Value
val local, remote = Value
}
}

Expand Down

0 comments on commit 0d6731e

Please sign in to comment.