Skip to content

Commit 5d2ebdb

Browse files
fix: significant performance drop with stream.toQueue (#468)
1 parent bac0610 commit 5d2ebdb

File tree

5 files changed

+169
-17
lines changed

5 files changed

+169
-17
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
syntax = "proto3";
2+
3+
option java_multiple_files = true;
4+
option java_package = "scalapb.zio_grpc.helloworld";
5+
option java_outer_classname = "HelloWorldProto";
6+
7+
package helloworld;
8+
9+
// The greeting service definition.
10+
service Greeter {
11+
// Sends a greeting
12+
rpc SayHello (HelloRequest) returns (HelloReply) {}
13+
14+
// Sends a litany of greetings
15+
rpc SayHelloStreaming (HelloRequest) returns (stream HelloReply) {}
16+
}
17+
18+
// The actual message exchanged by the client and the server.
19+
message Hello {
20+
string name = 1;
21+
double d = 2;
22+
float f = 3;
23+
bool b = 4;
24+
int32 n = 5;
25+
int64 l = 6;
26+
oneof choice {
27+
string c1 = 7;
28+
bool c2 = 8;
29+
}
30+
message Pet {
31+
enum Color {
32+
BLACK = 0;
33+
WHITE = 1;
34+
BLUE = 2;
35+
RED = 3;
36+
YELLOW = 4;
37+
GREEN = 5;
38+
}
39+
string name = 1;
40+
Color color = 2;
41+
}
42+
repeated Pet pets = 9;
43+
}
44+
45+
// The request message from the client.
46+
message HelloRequest {
47+
Hello request = 1;
48+
}
49+
50+
// The response message from the server.
51+
message HelloReply {
52+
Hello response = 1;
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package scalapb.zio_grpc
2+
3+
import scalapb.zio_grpc.helloworld.testservice.ZioTestservice.ZGreeter
4+
import scalapb.zio_grpc.helloworld.testservice.{HelloReply, HelloRequest}
5+
import io.grpc.Status
6+
import zio.ZIO
7+
import zio.stream.ZStream
8+
9+
class GreeterImpl(size: Long) extends ZGreeter[Any] {
10+
11+
def sayHello(request: HelloRequest): ZIO[Any, Status, HelloReply] =
12+
ZIO.succeed(HelloReply(request.request))
13+
14+
def sayHelloStreaming(request: HelloRequest): ZStream[Any, Status, HelloReply] =
15+
ZStream.repeat(HelloReply(request.request)).take(size)
16+
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package scalapb.zio_grpc
2+
3+
import io.grpc.ManagedChannelBuilder
4+
import io.grpc.ServerBuilder
5+
import scalapb.zio_grpc.helloworld.testservice._
6+
import zio._
7+
import java.time
8+
9+
object ServerStreamingBenchmarkApp extends ZIOAppDefault {
10+
11+
val size = 100000L
12+
13+
val server =
14+
ServerLayer.fromEnvironment[ZioTestservice.Greeter](ServerBuilder.forPort(50051))
15+
16+
val client =
17+
ZLayer.scoped[Server] {
18+
for {
19+
ss <- ZIO.service[Server]
20+
port <- ss.port.orDie
21+
ch = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext()
22+
client <- ZioTestservice.GreeterClient.scoped(ZManagedChannel(ch)).orDie
23+
} yield client
24+
}
25+
26+
val service =
27+
ZLayer.succeed[ZioTestservice.Greeter] {
28+
new GreeterImpl(size)
29+
}
30+
31+
def run = ZIO
32+
.foreach(Array(8192, 65536)) { queueSize =>
33+
val props = java.lang.System.getProperties();
34+
props.setProperty("zio-grpc.backpressure-queue-size", queueSize.toString());
35+
36+
for {
37+
_ <- Console.printLine(s"Starting with queue size $queueSize")
38+
cpt <- Ref.make(0)
39+
start <- Clock.instant.flatMap(Ref.make(_))
40+
result <- ZioTestservice.GreeterClient
41+
.sayHelloStreaming(HelloRequest(request = Some(Hello(name = "Testing streaming"))))
42+
.tap(_ => cpt.update(_ + 1))
43+
.tap { _ =>
44+
for {
45+
now <- Clock.instant
46+
started <- start.get
47+
_ <- ZIO.when(time.Duration.between(started, now).toSeconds() >= 10)(
48+
start.set(now) *> cpt.get.flatMap(cpt => Console.printLine(s"Received $cpt messages"))
49+
)
50+
} yield ()
51+
}
52+
.runDrain
53+
.timed
54+
_ <- Console.printLine(s"queue size: $queueSize (${result._1.toMillis()}ms)")
55+
} yield ()
56+
}
57+
.provide(service >+> server >+> client)
58+
59+
}

build.sbt

+26
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,32 @@ lazy val e2e =
125125
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
126126
)
127127

128+
lazy val benchmarks =
129+
projectMatrix
130+
.in(file("benchmarks"))
131+
.dependsOn(core)
132+
.defaultAxes()
133+
.enablePlugins(LocalCodeGenPlugin)
134+
.jvmPlatform(ScalaVersions)
135+
.settings(stdSettings)
136+
.settings(
137+
crossScalaVersions := Seq(Scala212, Scala213),
138+
publish / skip := true,
139+
libraryDependencies ++= Seq(
140+
"dev.zio" %% "zio" % Version.zio,
141+
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
142+
"io.grpc" % "grpc-netty" % Version.grpc
143+
),
144+
Compile / PB.targets := Seq(
145+
scalapb.gen(grpc = true) -> (Compile / sourceManaged).value,
146+
genModule(
147+
"scalapb.zio_grpc.ZioCodeGenerator$"
148+
) -> (Compile / sourceManaged).value
149+
),
150+
PB.protocVersion := "3.13.0",
151+
codeGenClasspath := (codeGenJVM212 / Compile / fullClasspath).value
152+
)
153+
128154
lazy val docs = project
129155
.enablePlugins(LocalCodeGenPlugin)
130156
.in(file("zio-grpc-docs"))

core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala

+14-17
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import scalapb.zio_grpc.RequestContext
1111
import io.grpc.Metadata
1212
import scalapb.zio_grpc.SafeMetadata
1313
import zio.stm.TSemaphore
14-
import zio.stream.Take
1514

1615
class ZServerCallHandler[Req, Res](
1716
runtime: Runtime[Any],
@@ -109,31 +108,29 @@ object ZServerCallHandler {
109108
call: ZServerCall[Res],
110109
stream: ZStream[Any, Status, Res]
111110
): ZIO[Any, Status, Unit] = {
112-
def innerLoop(queue: Dequeue[Take[Status, Res]], buffer: Ref[Chunk[Res]]): ZIO[Any, Status, Boolean] =
113-
buffer
114-
.modify(chunk => chunk.headOption -> chunk.drop(1))
111+
def innerLoop(queue: Dequeue[Exit[Option[Status], Res]]): ZIO[Any, Status, Boolean] =
112+
queue.take
115113
.flatMap {
116-
case None =>
117-
queue.take.flatMap(
118-
_.foldZIO(ZIO.succeed(false), ZIO.failCause(_), buffer.set(_) *> innerLoop(queue, buffer))
119-
)
120-
case Some(res) =>
121-
call.sendMessage(res).as(true)
114+
case Exit.Success(res) => call.sendMessage(res).as(true)
115+
case Exit.Failure(cause) =>
116+
cause.failureOrCause match {
117+
case Left(Some(status)) => ZIO.fail(status)
118+
case Left(None) => ZIO.succeed(false)
119+
case Right(cause) => ZIO.failCause(cause)
120+
}
122121
}
123122
.repeatWhileZIO(res => call.isReady.map(_ && res))
124123

125-
def outerLoop(queue: Dequeue[Take[Status, Res]])(buffer: Ref[Chunk[Res]]): ZIO[Any, Status, Boolean] =
126-
(call.awaitReady *> innerLoop(queue, buffer))
124+
def outerLoop(queue: Dequeue[Exit[Option[Status], Res]]): ZIO[Any, Status, Boolean] =
125+
(call.awaitReady *> innerLoop(queue))
127126
.repeatWhile(identity)
128127

129128
for {
130129
queueSize <- backpressureQueueSize
131-
_ <- ZIO.scoped(
130+
_ <- ZIO.scoped[Any](
132131
stream
133-
.toQueue(queueSize)
134-
.flatMap { queue =>
135-
Ref.make[Chunk[Res]](Chunk.empty).flatMap(outerLoop(queue))
136-
}
132+
.toQueueOfElements(queueSize)
133+
.flatMap(outerLoop)
137134
)
138135
} yield ()
139136
}

0 commit comments

Comments
 (0)