Skip to content

Commit

Permalink
Add gatling simulations
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Jun 14, 2024
1 parent 0dc84e1 commit 200db53
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 11 deletions.
13 changes: 9 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,14 @@ libraryDependencies ++= Seq(
"ca.uhn.hapi" % "hapi-structures-v25" % "2.3",
"ca.uhn.hapi" % "hapi-structures-v281" % "2.3",

"org.apache.opennlp" % "opennlp-tools" % "2.3.1",
"org.apache.opennlp" % "opennlp-tools" % "2.3.3",

"org.apache.httpcomponents.client5" % "httpclient5" % "5.3.1",
"org.apache.httpcomponents.core5" % "httpcore5" % "5.2.4",
"commons-io" % "commons-io" % "2.16.1",
"org.apache.commons" % "commons-lang3" % "3.12.0",
"com.twitter" %% "bijection-avro" % "0.9.7",

//"io.apicurio" % "apicurio-registry-utils-serde" % "1.3.2.Final",


"org.apache.camel" % "camel-core" % "3.20.2",
"org.apache.camel" % "camel-reactive-streams" % "3.20.2",
Expand Down Expand Up @@ -146,7 +144,12 @@ libraryDependencies ++= Seq(

"org.scalatest" %% "scalatest" % "3.2.18" % Test,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test,
"org.assertj" % "assertj-core" % "3.25.3" % Test
"org.assertj" % "assertj-core" % "3.25.3" % Test,

// https://docs.gatling.io/reference/integrations/build-tools/sbt-plugin/
"io.gatling" % "gatling-core" % "3.11.3",
"io.gatling.highcharts" % "gatling-charts-highcharts" % "3.11.3",
"io.gatling" % "gatling-test-framework" % "3.11.3"
)

resolvers += "repository.jboss.org-public" at "https://repository.jboss.org/nexus/content/groups/public"
Expand All @@ -166,6 +169,8 @@ run / fork := true

Test / parallelExecution := false

enablePlugins(GatlingPlugin)

// Needed as long as this lib is in the dependencies
// https://eed3si9n.com/sbt-1.5.0
// https://www.scala-lang.org/blog/2021/02/16/preventing-version-conflicts-with-versionscheme.html
Expand Down
4 changes: 3 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1")
//The now built in dependencyTree task is usually enough
//https://www.scala-sbt.org/1.x/docs/sbt-1.4-Release-Notes.html#sbt-dependency-graph+is+in-sourced
//addDependencyTreePlugin
//addDependencyTreePlugin

addSbtPlugin("io.gatling" % "gatling-sbt" % "4.9.0")
18 changes: 12 additions & 6 deletions src/main/scala/akkahttp/ReverseProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

/**
* Inspired by:
* Conceptual PoC inspired by:
* https://github.com/mathieuancelin/akka-http-reverse-proxy
*
* HTTP reverse proxy server echo PoC with:
* - Weighted round robin load balancing
* - Retry on 5xx
* - Retry on HTTP 5xx from target servers
* - CircuitBreaker per target server to avoid overload
* - HTTP Header `X-Correlation-ID` for tracing (only for Mode.local)
*
Expand All @@ -39,6 +39,16 @@ import scala.util.{Failure, Success}
* Mode.remote:
* HTTP client(s) --> ReverseProxy --> remote target server(s)
*
* Remarks:
* - The target server selection works via the "Host" HTTP header
* - Local/Remote target servers are designed to be flaky to show retry/circuit breaker behavior
* - On top of the built in client, you may also try other clients
* - This PoC may not scale well, possible bottlenecks are:
* - Combination of Retry/CircuitBreaker
* - Round robin impl. with `requestCounter` means shared state
*
* Gatling client: [[ReverseProxySimulation]]
*
* curl client:
* curl -H "Host: local" -H "X-Correlation-ID: 1-1" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/mypath
* curl -H "Host: remote" -o - -i -w " %{time_total}\n" http://127.0.0.1:8080/200
Expand All @@ -47,10 +57,6 @@ import scala.util.{Failure, Success}
* wrk -t2 -c10 -d10s -H "Host: local" --latency http://127.0.0.1:8080/mypath
* wrk -t2 -c10 -d10s -H "Host: remote" --latency http://127.0.0.1:8080/200
*
* This conceptual PoC works but may not scale well, possible bottlenecks:
* - Combination of Retry/CircuitBreaker
* - Round robin impl. with `requestCounter` means shared state
*
* Doc:
* https://pekko.apache.org/docs/pekko/current/common/circuitbreaker.html
* https://pekko.apache.org/docs/pekko-http/current//implications-of-streaming-http-entity.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import scala.concurrent.Future
* Finally move the file to `rootDir/processed`
*
* Run with test class: [[DirectoryWatcherSpec]]
* Run with gatling : [[DirectoryWatcherSimulation]]
*
* Remarks:
* - [[FileAlterationListenerAdaptor]] allows to recursively listen to file changes at runtime
Expand Down
107 changes: 107 additions & 0 deletions src/test/scala/DirectoryWatcherSimulation.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import alpakka.file.uploader.DirectoryWatcher
import io.gatling.core.Predef.*
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.session.Session
import io.gatling.core.structure.{ScenarioBuilder, ScenarioContext}
import io.gatling.core.util.NameGen

import java.nio.file.{Files, Paths}
import java.util.UUID
import scala.concurrent.duration.*

/**
* Show the use of Gatling ActionBuilder to create a custom action
* In our case to generate files to be picked up by the [[DirectoryWatcher]]
* Since there are not HTTP requests issued by this simulation,
* we do not get a report at the end
*
* Run from terminal:
* sbt 'Gatling/testOnly DirectoryWatcherSimulation'
*/
class DirectoryWatcherSimulation extends Simulation {
private val rootDir = "/tmp/directory-watcher-simulation"
private val uploadDir = Paths.get(rootDir, "upload")
private val processedDir = Paths.get(rootDir, "processed")

setupDirectories()
val watcher = DirectoryWatcher(uploadDir, processedDir)

private def setupDirectories(): Unit = {
Files.createDirectories(Paths.get(rootDir))

if (!Files.exists(uploadDir)) {
Files.createDirectories(uploadDir)
println(s"Created temporary upload directory at: $uploadDir")
}

if (!Files.exists(processedDir)) {
Files.createDirectories(processedDir)
println(s"Created temporary processed directory at: $processedDir")
}
}

// Custom action to generate unique files
class FileGenerationAction(next: Action) extends Action with NameGen {

override def name: String = genName("fileGeneration")

override def execute(session: Session): Unit = {
val uuid = UUID.randomUUID().toString
val fileName = s"generated_file_$uuid.txt"
val filePath = uploadDir.resolve(fileName)
val content = "This is a generated file."

Files.write(filePath, content.getBytes)
println(s"Generated file: $filePath")

next ! session
}
}

// Custom action builder to wrap the FileGenerationAction
class FileGenerationActionBuilder extends ActionBuilder {
override def build(ctx: ScenarioContext, next: Action): Action = {
new FileGenerationAction(next)
}
}

class CountFilesProcessedAction(watcher: DirectoryWatcher, next: Action) extends Action {
override def name: String = "CountFilesProcessedAction"

override def execute(session: Session): Unit = {
val processedFilesCount = watcher.countFilesProcessed()
println(s"Total files processed: $processedFilesCount")
next ! session
}
}

class CountFilesProcessedActionBuilder(watcher: DirectoryWatcher) extends ActionBuilder {
override def build(ctx: ScenarioContext, next: Action): Action = {
new CountFilesProcessedAction(watcher, next)
}
}

private val scn: ScenarioBuilder = scenario("DirectoryWatcher")
.exec(new FileGenerationActionBuilder)
.exec { session =>
println("File Generation completed")
session
}
.exec(new CountFilesProcessedActionBuilder(watcher))
.exec { session =>
println("Scenario completed")
session
}

setUp(
scn.inject(
nothingFor(2.seconds),
atOnceUsers(2),
rampUsers(10) during 10.seconds
)
).assertions(
global.responseTime.max.lt(5000),
global.successfulRequests.percent.is(100)
).maxDuration(30.seconds) // For now stop like this
}
44 changes: 44 additions & 0 deletions src/test/scala/ReverseProxySimulation.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

import io.gatling.core.Predef.*
import io.gatling.http.Predef.*

import scala.concurrent.duration.*

/**
* Start [[akkahttp.ReverseProxy]]
* Run this Simulation with:
* sbt 'Gatling/testOnly ReverseProxySimulation'
*/
class ReverseProxySimulation extends Simulation {
val baseUrl = "http://127.0.0.1:8080"

val httpProtocol = http
.baseUrl(baseUrl)
.acceptHeader("application/json")
.userAgentHeader("Gatling")

val scn = scenario("GatlingLocalClient")
.exec(session => session.set("correlationId", 1))
.repeat(10) {
exec(
http("Local Mode Request")
.get("/")
.header("Host", "local")
.header("X-Correlation-ID", session => s"1-${session("correlationId").as[Int]}")
.check(status.is(200))
.check(header("X-Correlation-ID").saveAs("responseCorrelationId"))
)
.exec(session => {
println(s"Got response for id: ${session("responseCorrelationId").as[String]}")
session
})
.exec(session => session.set("correlationId", session("correlationId").as[Int] + 1))
}

setUp(
scn.inject(
atOnceUsers(10),
rampUsers(50).during(30.seconds)
)
).protocols(httpProtocol)
}

0 comments on commit 200db53

Please sign in to comment.