Skip to content

Commit 5491e2d

Browse files
authored
Add benchmarks module (#128)
1 parent ed203e4 commit 5491e2d

File tree

5 files changed

+143
-5
lines changed

5 files changed

+143
-5
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.devsisters.shardcake
2+
3+
import com.devsisters.shardcake.Config
4+
import com.devsisters.shardcake.Server.Message.Ping
5+
import com.devsisters.shardcake.Server.PingPongEntity
6+
import zio._
7+
8+
object Client {
9+
// self host should not be `localhost` to avoid optimization
10+
private val config = ZLayer.succeed(Config.default.copy(selfHost = "not-localhost"))
11+
12+
def send(count: Int, parallelism: Int): Task[Unit] =
13+
ZIO
14+
.scoped[Sharding](
15+
for {
16+
ping <- Sharding.messenger(PingPongEntity)
17+
_ <- ZIO.foreachParDiscard(1 to count)(_ => ping.send("ping")(Ping("ping", _))).withParallelism(parallelism)
18+
} yield ()
19+
)
20+
.provide(config, Server.sharding)
21+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.devsisters.shardcake
2+
3+
import org.openjdk.jmh.annotations._
4+
import zio.{ durationInt, Fiber, Runtime, Unsafe, ZIO }
5+
6+
import java.util.concurrent.TimeUnit
7+
8+
@State(Scope.Thread)
9+
@BenchmarkMode(Array(Mode.Throughput))
10+
@OutputTimeUnit(TimeUnit.SECONDS)
11+
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
12+
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
13+
@Fork(1)
14+
class SendBenchmark {
15+
private var fiber: Fiber[Any, Any] = _
16+
17+
@Setup
18+
def setup(): Unit =
19+
fiber = Unsafe.unsafe(implicit unsafe =>
20+
Runtime.default.unsafe.run(Server.run.forkDaemon <* ZIO.sleep(3.seconds)).getOrThrow()
21+
)
22+
23+
@TearDown
24+
def tearDown(): Unit =
25+
Unsafe.unsafe(implicit unsafe => Runtime.default.unsafe.run(fiber.interrupt))
26+
27+
@Benchmark
28+
def send(): Unit =
29+
Unsafe.unsafe(implicit unsafe => Runtime.default.unsafe.run(Client.send(100, 8)))
30+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.devsisters.shardcake
2+
3+
import com.devsisters.shardcake.Config
4+
import com.devsisters.shardcake.interfaces.Storage
5+
import zio._
6+
import zio.stream.ZStream
7+
8+
import java.util.concurrent.ForkJoinPool
9+
10+
object Server {
11+
sealed trait Message
12+
13+
object Message {
14+
case class Ping(msg: String, replier: Replier[String]) extends Message
15+
}
16+
17+
object PingPongEntity extends EntityType[Message]("ping-pong")
18+
19+
private def behavior(entityId: String, messages: Dequeue[Message]): RIO[Sharding, Nothing] =
20+
messages.take.flatMap { case Message.Ping(msg, replier) => replier.reply(msg) }.forever
21+
22+
private val shardManagerClient: ZLayer[Config, Nothing, ShardManagerClient] =
23+
ZLayer {
24+
for {
25+
config <- ZIO.service[Config]
26+
pod = PodAddress("localhost", config.shardingPort)
27+
shards = (1 to config.numberOfShards).map(_ -> Some(pod)).toMap
28+
} yield new ShardManagerClient {
29+
def register(podAddress: PodAddress): Task[Unit] = ZIO.unit
30+
def unregister(podAddress: PodAddress): Task[Unit] = ZIO.unit
31+
def notifyUnhealthyPod(podAddress: PodAddress): Task[Unit] = ZIO.unit
32+
def getAssignments: Task[Map[Int, Option[PodAddress]]] = ZIO.succeed(shards)
33+
}
34+
}
35+
36+
private val memory: ULayer[Storage] =
37+
ZLayer {
38+
for {
39+
assignmentsRef <- Ref.make(Map.empty[ShardId, Option[PodAddress]])
40+
podsRef <- Ref.make(Map.empty[PodAddress, Pod])
41+
} yield new Storage {
42+
def getAssignments: Task[Map[ShardId, Option[PodAddress]]] = assignmentsRef.get
43+
def saveAssignments(assignments: Map[ShardId, Option[PodAddress]]): Task[Unit] = assignmentsRef.set(assignments)
44+
def assignmentsStream: ZStream[Any, Throwable, Map[ShardId, Option[PodAddress]]] = ZStream.never
45+
def getPods: Task[Map[PodAddress, Pod]] = podsRef.get
46+
def savePods(pods: Map[PodAddress, Pod]): Task[Unit] = podsRef.set(pods)
47+
}
48+
}
49+
50+
private val grpcConfig: ULayer[GrpcConfig] =
51+
ZLayer.succeed(GrpcConfig.default.copy(executor = Some(ForkJoinPool.commonPool())))
52+
53+
val sharding: ZLayer[Config, Throwable, Sharding with GrpcConfig] =
54+
ZLayer.makeSome[Config, Sharding with GrpcConfig](
55+
KryoSerialization.live,
56+
memory,
57+
grpcConfig,
58+
shardManagerClient,
59+
GrpcPods.live,
60+
Sharding.live
61+
)
62+
63+
val run: Task[Unit] =
64+
ZIO
65+
.scoped[Sharding](
66+
for {
67+
_ <- Sharding.registerEntity(PingPongEntity, behavior)
68+
_ <- Sharding.registerScoped
69+
_ <- ZIO.never
70+
} yield ()
71+
)
72+
.provide(
73+
ZLayer.succeed(Config.default),
74+
sharding,
75+
GrpcShardingService.live
76+
)
77+
}

build.sbt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ lazy val root = project
5858
storageRedisson,
5959
serializationKryo,
6060
grpcProtocol,
61-
examples
61+
examples,
62+
benchmarks
6263
)
6364

6465
lazy val core = project
@@ -186,6 +187,14 @@ lazy val examples = project
186187
)
187188
.dependsOn(manager, storageRedis, grpcProtocol, serializationKryo)
188189

190+
lazy val benchmarks = project
191+
.in(file("benchmarks"))
192+
.settings(name := "benchmarks")
193+
.settings(publish / skip := true)
194+
.settings(commonSettings)
195+
.enablePlugins(JmhPlugin)
196+
.dependsOn(grpcProtocol, serializationKryo)
197+
189198
lazy val protobuf = Seq(
190199
PB.protocVersion := "3.19.2"
191200
) ++ Project.inConfig(Test)(sbtprotoc.ProtocPlugin.protobufConfigSettings)

project/plugins.sbt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
2-
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
3-
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.13")
4-
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6")
1+
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
2+
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
3+
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.13")
4+
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6")
5+
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.7")
56

67
resolvers ++= Resolver.sonatypeOssRepos("snapshots")
78

0 commit comments

Comments
 (0)