Skip to content

Commit 1529037

Browse files
authored
Make grpc executor configurable (#113)
1 parent 9c831a4 commit 1529037

File tree

3 files changed

+38
-16
lines changed

3 files changed

+38
-16
lines changed
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package com.devsisters.shardcake
22

3+
import java.util.concurrent.Executor
4+
35
/**
46
* The configuration for the gRPC client.
7+
*
58
* @param maxInboundMessageSize the maximum message size allowed to be received by the grpc client
9+
* @param executor a custom executor to pass to grpc-java when creating gRPC clients and servers
610
*/
7-
case class GrpcConfig(maxInboundMessageSize: Int)
11+
case class GrpcConfig(maxInboundMessageSize: Int, executor: Option[Executor])
812

913
object GrpcConfig {
1014
val default: GrpcConfig =
11-
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024)
15+
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024, None)
1216
}

protocol-grpc/src/main/scala/com/devsisters/shardcake/GrpcPods.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,23 @@ class GrpcPods(
2525
map.get(pod) match {
2626
case Some((channel, _)) => ZIO.succeed((channel, map))
2727
case None =>
28-
val channel: ZManagedChannel =
29-
ZManagedChannel.apply(
30-
ManagedChannelBuilder
31-
.forAddress(pod.host, pod.port)
32-
.maxInboundMessageSize(config.maxInboundMessageSize)
33-
.usePlaintext()
34-
)
28+
val builder = {
29+
config.executor match {
30+
case Some(executor) =>
31+
ManagedChannelBuilder
32+
.forAddress(pod.host, pod.port)
33+
.executor(executor)
34+
.maxInboundMessageSize(config.maxInboundMessageSize)
35+
.usePlaintext()
36+
case None =>
37+
ManagedChannelBuilder
38+
.forAddress(pod.host, pod.port)
39+
.maxInboundMessageSize(config.maxInboundMessageSize)
40+
.usePlaintext()
41+
}
42+
}
43+
44+
val channel = ZManagedChannel(builder)
3545
// create a fiber that never ends and keeps the connection alive
3646
for {
3747
_ <- ZIO.logDebug(s"Opening connection to pod $pod")

protocol-grpc/src/main/scala/com/devsisters/shardcake/GrpcShardingService.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,22 @@ object GrpcShardingService {
5353
/**
5454
* A layer that creates a gRPC server that exposes the Pods API.
5555
*/
56-
val live: ZLayer[Config with Sharding, Throwable, Unit] =
57-
ZLayer.scoped[Config with Sharding] {
56+
val live: ZLayer[Config with Sharding with GrpcConfig, Throwable, Unit] =
57+
ZLayer.scoped[Config with Sharding with GrpcConfig] {
5858
for {
59-
config <- ZIO.service[Config]
60-
sharding <- ZIO.service[Sharding]
61-
builder = ServerBuilder.forPort(config.shardingPort).addService(ProtoReflectionService.newInstance())
62-
services = ServiceList.add(new GrpcShardingService(sharding, config.sendTimeout) {})
63-
_ <- ScopedServer.fromServiceList(builder, services)
59+
config <- ZIO.service[Config]
60+
grpcConfig <- ZIO.service[GrpcConfig]
61+
sharding <- ZIO.service[Sharding]
62+
builder = grpcConfig.executor match {
63+
case Some(executor) =>
64+
ServerBuilder
65+
.forPort(config.shardingPort)
66+
.executor(executor)
67+
case None =>
68+
ServerBuilder.forPort(config.shardingPort)
69+
}
70+
services = ServiceList.add(new GrpcShardingService(sharding, config.sendTimeout) {})
71+
_ <- ScopedServer.fromServiceList(builder.addService(ProtoReflectionService.newInstance()), services)
6472
} yield ()
6573
}
6674
}

0 commit comments

Comments
 (0)