Skip to content

Commit af7c0b4

Browse files
dfakhritdinovDenys Fakhritdinov
andauthored
support stashing on rebalance in ClusterShardingLocal (#316)
Co-authored-by: Denys Fakhritdinov <dfakhritdinov@evolution.com>
1 parent a9e6704 commit af7c0b4

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

cluster-sharding/src/main/scala/com/evolutiongaming/akkaeffect/cluster/sharding/ClusterShardingLocal.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,16 @@ object ClusterShardingLocal {
133133
Map((self, shardIds))
134134
}
135135

136+
var rebalancing = false
137+
var stashed = Vector.empty[ShardRegion.Msg]
138+
139+
object Unstash
140+
136141
def receive: Receive = {
137142

138143
case RegionMsg.Rebalance =>
144+
rebalancing = true
145+
context.system.scheduler.scheduleOnce(1.second, self, Unstash)
139146
allocationStrategy
140147
.rebalance(allocation(), Set.empty)
141148
.onComplete { _ =>
@@ -148,6 +155,14 @@ object ClusterShardingLocal {
148155
val shards = allocation().values.flatten.toList
149156
context.sender().tell(shards, context.self)
150157

158+
case Unstash =>
159+
rebalancing = false
160+
stashed.foreach(self.tell(_, context.self))
161+
stashed = Vector.empty
162+
163+
case msg: ShardRegion.Msg if rebalancing =>
164+
stashed = stashed :+ msg
165+
151166
case msg: ShardRegion.Msg =>
152167
val sender = context.sender()
153168
val shardId = extractShardId(msg)

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ThisBuild / version := "4.1.2-SNAPSHOT"
1+
ThisBuild / version := "4.1.2"

0 commit comments

Comments
 (0)