@@ -5,11 +5,13 @@ import com.devsisters.shardcake.interfaces.Pods.BinaryMessage
5
5
import com .devsisters .shardcake .protobuf .sharding .ZioSharding .ShardingService
6
6
import com .devsisters .shardcake .protobuf .sharding ._
7
7
import com .google .protobuf .ByteString
8
+ import io .grpc ._
8
9
import io .grpc .protobuf .services .ProtoReflectionService
9
- import io .grpc .{ ServerBuilder , Status , StatusException , StatusRuntimeException }
10
- import scalapb .zio_grpc .{ ScopedServer , ServiceList }
11
- import zio .{ Config => _ , _ }
10
+ import scalapb .zio_grpc .ServiceList
12
11
import zio .stream .ZStream
12
+ import zio .{ Config => _ , _ }
13
+
14
+ import java .util .concurrent .TimeUnit
13
15
14
16
abstract class GrpcShardingService (sharding : Sharding , timeout : Duration ) extends ShardingService {
15
17
def assignShards (request : AssignShardsRequest ): ZIO [Any , StatusException , AssignShardsResponse ] =
@@ -56,19 +58,29 @@ object GrpcShardingService {
56
58
val live : ZLayer [Config with Sharding with GrpcConfig , Throwable , Unit ] =
57
59
ZLayer .scoped[Config with Sharding with GrpcConfig ] {
58
60
for {
59
- config <- ZIO .service[Config ]
60
- grpcConfig <- ZIO .service[GrpcConfig ]
61
- sharding <- ZIO .service[Sharding ]
62
- builder = grpcConfig.executor match {
63
- case Some (executor) =>
64
- ServerBuilder
65
- .forPort(config.shardingPort)
66
- .executor(executor)
67
- case None =>
68
- ServerBuilder .forPort(config.shardingPort)
69
- }
70
- services = ServiceList .add(new GrpcShardingService (sharding, config.sendTimeout) {})
71
- _ <- ScopedServer .fromServiceList(builder.addService(ProtoReflectionService .newInstance()), services)
61
+ config <- ZIO .service[Config ]
62
+ grpcConfig <- ZIO .service[GrpcConfig ]
63
+ sharding <- ZIO .service[Sharding ]
64
+ builder = grpcConfig.executor match {
65
+ case Some (executor) =>
66
+ ServerBuilder
67
+ .forPort(config.shardingPort)
68
+ .executor(executor)
69
+ case None =>
70
+ ServerBuilder .forPort(config.shardingPort)
71
+ }
72
+ services <- ServiceList .add(new GrpcShardingService (sharding, config.sendTimeout) {}).bindAll
73
+ server : Server = services
74
+ .foldLeft(builder) { case (builder0, service) => builder0.addService(service) }
75
+ .addService(ProtoReflectionService .newInstance())
76
+ .build()
77
+ _ <- ZIO .acquireRelease(ZIO .attempt(server.start()))(server =>
78
+ ZIO .attemptBlocking {
79
+ server.shutdown()
80
+ server.awaitTermination(grpcConfig.shutdownTimeout.toMillis, TimeUnit .MILLISECONDS )
81
+ server.shutdownNow()
82
+ }.ignore
83
+ )
72
84
} yield ()
73
85
}
74
86
}
0 commit comments