From ac770c8f0f4ea13e9410eae05f73c9f6d6bededd Mon Sep 17 00:00:00 2001 From: Haemin Yoo Date: Thu, 17 Oct 2024 09:12:19 +0900 Subject: [PATCH] shard-manager: only allow alive pods to register (#146) * shard-manager: only allow alive pods to register * Fail when unhealthy pod tries to register * Clarify error message --- .../devsisters/shardcake/ShardManager.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala index 68868c3..21d5047 100644 --- a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala +++ b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala @@ -30,19 +30,23 @@ class ShardManager( def getShardingEvents: ZStream[Any, Nothing, ShardingEvent] = ZStream.fromHub(eventsHub) - def register(pod: Pod): UIO[Unit] = - for { - _ <- ZIO.logInfo(s"Registering $pod") - state <- stateRef.updateAndGetZIO(state => - ZIO - .succeed(OffsetDateTime.now()) - .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata(pod, cdt)))) - ) - _ <- ManagerMetrics.pods.increment - _ <- eventsHub.publish(ShardingEvent.PodRegistered(pod.address)) - _ <- ZIO.when(state.unassignedShards.nonEmpty)(rebalance(false)) - _ <- persistPods.forkDaemon - } yield () + def register(pod: Pod): Task[Unit] = + ZIO.ifZIO(healthApi.isAlive(pod.address))( + onTrue = for { + _ <- ZIO.logInfo(s"Registering $pod") + state <- stateRef.updateAndGetZIO(state => + ZIO + .succeed(OffsetDateTime.now()) + .map(cdt => state.copy(pods = state.pods.updated(pod.address, PodWithMetadata(pod, cdt)))) + ) + _ <- ManagerMetrics.pods.increment + _ <- eventsHub.publish(ShardingEvent.PodRegistered(pod.address)) + _ <- ZIO.when(state.unassignedShards.nonEmpty)(rebalance(rebalanceImmediately = false)) + _ <- persistPods.forkDaemon + } yield (), + onFalse = ZIO.logWarning(s"Pod $pod requested to register but is not alive, ignoring") *> + ZIO.fail(new RuntimeException(s"Pod $pod is not healthy, refusing to register")) + ) def notifyUnhealthyPod(podAddress: PodAddress): UIO[Unit] = ZIO