diff --git a/ring/model.go b/ring/model.go index fb3095172..c4ba64466 100644 --- a/ring/model.go +++ b/ring/model.go @@ -594,6 +594,29 @@ func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int { return instancesCountPerZone } +func (d *Desc) readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() (int, int64) { + readOnlyInstances := 0 + oldestReadOnlyUpdatedTimestamp := int64(0) + first := true + + if d != nil { + for _, ingester := range d.Ingesters { + if !ingester.ReadOnly { + continue + } + + readOnlyInstances++ + if first { + oldestReadOnlyUpdatedTimestamp = ingester.ReadOnlyUpdatedTimestamp + } else { + oldestReadOnlyUpdatedTimestamp = min(oldestReadOnlyUpdatedTimestamp, ingester.ReadOnlyUpdatedTimestamp) + } + first = false + } + } + return readOnlyInstances, oldestReadOnlyUpdatedTimestamp +} + type CompareResult int // CompareResult responses diff --git a/ring/ring.go b/ring/ring.go index bb7e29c28..7d69f3884 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -191,6 +191,12 @@ type Ring struct { // then this value will be 0. oldestRegisteredTimestamp int64 + readOnlyInstances *int // Number of instances with ReadOnly flag set. Only valid if not nil. + // Oldest value of ReadOnlyUpdatedTimestamp for read-only instances. If there are no read-only instances, + // or if any read-only instance has ReadOnlyUpdatedTimestamp == 0 (which should not happen), then this value will be 0. + // Only valid if not nil. + oldestReadOnlyUpdatedTimestamp *int64 + // Maps a token with the information of the instance holding it. This map is immutable and // cannot be changed in place because it's shared "as is" between subrings (the only way to // change it is to create a new one and replace it). @@ -372,6 +378,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone() writableInstancesWithTokensCount := ringDesc.writableInstancesWithTokensCount() writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone() + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() r.mtx.Lock() defer r.mtx.Unlock() @@ -387,6 +394,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone r.oldestRegisteredTimestamp = oldestRegisteredTimestamp r.lastTopologyChange = now + r.readOnlyInstances = &readOnlyInstances + r.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp // Invalidate all cached subrings. if r.shuffledSubringCache != nil { @@ -697,18 +706,16 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // // Subring returned by this method does not contain instances that have read-only field set. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - // Use all possible instances if shuffle sharding is disabled. We don't set size to r.InstancesCount(), because - // that could lead to not all instances being returned when ring zones are unbalanced. - // Reason for not returning entire ring directly is that we need to filter out read-only instances. - if size <= 0 { - size = math.MaxInt - } - if cached := r.getCachedShuffledSubring(identifier, size); cached != nil { return cached } - result := r.shuffleShard(identifier, size, 0, time.Now()) + var result *Ring + if size <= 0 { + result = r.filterOutReadOnlyInstances(0, time.Now()) + } else { + result = r.shuffleShard(identifier, size, 0, time.Now()) + } // Only cache subring if it is different from this ring, to avoid deadlocks in getCachedShuffledSubring, // when we update the cached ring. if result != r { @@ -725,17 +732,20 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // // This function supports caching, but the cache will only be effective if successive calls for the // same identifier are with the same lookbackPeriod and increasing values of now. +// +// Subring returned by this method does not contain read-only instances that have changed their state +// before the lookback period. func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { - // Nothing to do if the shard size is not smaller than the actual ring. - if size <= 0 || r.InstancesCount() <= size { - return r - } - if cached := r.getCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now); cached != nil { return cached } - result := r.shuffleShard(identifier, size, lookbackPeriod, now) + var result *Ring + if size <= 0 { + result = r.filterOutReadOnlyInstances(lookbackPeriod, now) + } else { + result = r.shuffleShard(identifier, size, lookbackPeriod, now) + } if result != r { r.setCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now, result) @@ -756,6 +766,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // // If any instance had RegisteredTimestamp equal to 0 (it would not cause additional lookup of next instance), // then r.oldestRegisteredTimestamp is zero too, and we skip this optimization. + // + // Even if some instances are read-only, they must have changed their read-only status within lookback window + // (because they were all registered within lookback window), so they would be included in the result. if lookbackPeriod > 0 && r.oldestRegisteredTimestamp > 0 && r.oldestRegisteredTimestamp >= lookbackUntil { return r } @@ -778,6 +791,19 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur var tokens []uint32 if r.cfg.ZoneAwarenessEnabled { + // If we're going to include all instances from this zone, we can simply filter out + // unwanted instances, and avoid iterating through tokens. + if numInstancesPerZone >= r.instancesCountPerZone[zone] { + for id, inst := range r.ringDesc.Ingesters { + if inst.Zone == zone && shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) { + shard[id] = inst + } + } + + // We can go to the next zone, no need to iterate tokens. + continue + } + tokens = r.ringTokensByZone[zone] } else { // When zone-awareness is disabled, we just iterate over 1 single fake zone @@ -819,11 +845,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur instanceID := info.InstanceID instance := r.ringDesc.Ingesters[instanceID] - // The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded. - if lookbackPeriod == 0 && instance.ReadOnly { + if !shouldIncludeReadonlyInstanceInTheShard(instance, lookbackPeriod, lookbackUntil) { continue } - // Include instance in the subring. shard[instanceID] = instance @@ -855,7 +879,56 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } - // Build a read-only ring for the shard. + return r.buildRingForTheShard(shard) +} + +// shouldIncludeReadonlyInstanceInTheShard returns true if instance is not read-only, or when it is read-only and should be included in the shuffle shard. +func shouldIncludeReadonlyInstanceInTheShard(instance InstanceDesc, lookbackPeriod time.Duration, lookbackUntil int64) bool { + if !instance.ReadOnly { + return true + } + // The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded. + if lookbackPeriod == 0 { + return false + } + // With lookback period >0, read only instances are only included if they have not changed read-only status in the lookback window. + // If ReadOnlyUpdatedTimestamp is not set, we include the instance, and extend the shard later. + if lookbackPeriod > 0 && instance.ReadOnlyUpdatedTimestamp > 0 && instance.ReadOnlyUpdatedTimestamp < lookbackUntil { + return false + } + return true +} + +// filterOutReadOnlyInstances removes all read-only instances from the ring, and returns the resulting ring. +func (r *Ring) filterOutReadOnlyInstances(lookbackPeriod time.Duration, now time.Time) *Ring { + lookbackUntil := now.Add(-lookbackPeriod).Unix() + + r.mtx.RLock() + defer r.mtx.RUnlock() + + // If there are no read-only instances, there's no need to do any filtering. + if r.readOnlyInstances != nil && *r.readOnlyInstances == 0 { + return r + } + + // If all readOnlyUpdatedTimestamp values are within lookback window, we can return the ring without any filtering. + if lookbackPeriod > 0 && r.oldestReadOnlyUpdatedTimestamp != nil && *r.oldestReadOnlyUpdatedTimestamp >= lookbackUntil { + return r + } + + shard := make(map[string]InstanceDesc, len(r.ringDesc.Ingesters)) + + for id, inst := range r.ringDesc.Ingesters { + if shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) { + shard[id] = inst + } + } + + return r.buildRingForTheShard(shard) +} + +// buildRingForTheShard builds read-only ring for the shard (this ring won't be updated in the future). +func (r *Ring) buildRingForTheShard(shard map[string]InstanceDesc) *Ring { shardDesc := &Desc{Ingesters: shard} shardTokensByZone := shardDesc.getTokensByZone() shardTokens := mergeTokenGroups(shardTokensByZone) diff --git a/ring/ring_test.go b/ring/ring_test.go index 0bedff56c..368dd57a1 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -8,7 +8,6 @@ import ( "math/rand" "sort" "strconv" - "strings" "sync" "testing" "time" @@ -216,14 +215,15 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) { SubringCacheDisabled: true, ReplicationFactor: numZones, }, - ringDesc: desc, - ringTokens: desc.GetTokens(), - ringTokensByZone: desc.getTokensByZone(), - ringInstanceByToken: desc.getTokensInfo(), - ringZones: getZones(desc.getTokensByZone()), - shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), - lastTopologyChange: time.Now(), + ringDesc: desc, + ringTokens: desc.GetTokens(), + ringTokensByZone: desc.getTokensByZone(), + ringInstanceByToken: desc.getTokensInfo(), + ringZones: getZones(desc.getTokensByZone()), + instancesCountPerZone: desc.instancesCountPerZone(), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), } // Measure how long does it take for a call to succeed. t0 := time.Now() @@ -1353,6 +1353,13 @@ func TestRing_ShuffleShard(t *testing.T) { expectedSize: 0, expectedDistribution: []int{}, }, + "empty ring, shardSize=0": { + ringInstances: nil, + shardSize: 0, + zoneAwarenessEnabled: true, + expectedSize: 0, + expectedDistribution: []int{}, + }, "single zone, shard size > num instances": { ringInstances: map[string]InstanceDesc{ "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, @@ -1365,6 +1372,18 @@ func TestRing_ShuffleShard(t *testing.T) { expectedZoneCount: 1, expectedInstancesInZoneCount: map[string]int{"zone-a": 2}, }, + "single zone, shard size == 0": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, + }, + shardSize: 3, + zoneAwarenessEnabled: true, + expectedSize: 2, + expectedDistribution: []int{2}, + expectedZoneCount: 1, + expectedInstancesInZoneCount: map[string]int{"zone-a": 2}, + }, "single zone, shard size < num instances": { ringInstances: map[string]InstanceDesc{ "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, @@ -1378,7 +1397,7 @@ func TestRing_ShuffleShard(t *testing.T) { expectedZoneCount: 1, expectedInstancesInZoneCount: map[string]int{"zone-a": 2}, }, - "single zone, with read only instance": { + "single zone, with read only instance, shardSize = 3": { ringInstances: map[string]InstanceDesc{ "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, @@ -1391,6 +1410,19 @@ func TestRing_ShuffleShard(t *testing.T) { expectedZoneCount: 1, expectedInstancesInZoneCount: map[string]int{"zone-a": 2}, }, + "single zone, with read only instance, shardSize = 0": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", ReadOnly: true, Tokens: gen.GenerateTokens(128, nil)}, + }, + shardSize: 0, + zoneAwarenessEnabled: true, + expectedSize: 2, + expectedDistribution: []int{2}, + expectedZoneCount: 1, + expectedInstancesInZoneCount: map[string]int{"zone-a": 2}, + }, "multiple zones, shard size < num zones": { ringInstances: map[string]InstanceDesc{ "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, @@ -1449,7 +1481,7 @@ func TestRing_ShuffleShard(t *testing.T) { zoneAwarenessEnabled: false, expectedSize: 4, }, - "multiple zones, with read only instance": { + "multiple zones, with read only instance, shardSize=3": { ringInstances: map[string]InstanceDesc{ "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)}, @@ -1462,6 +1494,22 @@ func TestRing_ShuffleShard(t *testing.T) { expectedZoneCount: 2, expectedInstancesInZoneCount: map[string]int{"zone-a": 1, "zone-b": 1, "zone-c": 0}, }, + "multiple zones, with read only instance, shardSize=0": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil), ReadOnly: true}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil), ReadOnly: true}, + }, + shardSize: 0, + zoneAwarenessEnabled: true, + expectedSize: 4, + expectedDistribution: []int{2, 1, 1}, + expectedZoneCount: 3, + expectedInstancesInZoneCount: map[string]int{"zone-a": 2, "zone-b": 1, "zone-c": 1}, + }, "multiple zones, shard size == num instances, balanced zones": { ringInstances: map[string]InstanceDesc{ "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)}, @@ -1500,7 +1548,7 @@ func TestRing_ShuffleShard(t *testing.T) { "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)}, "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)}, }, - shardSize: 3, + shardSize: 6, zoneAwarenessEnabled: true, expectedSize: 3, expectedDistribution: []int{1, 1, 1}, @@ -1542,58 +1590,66 @@ func TestRing_ShuffleShard(t *testing.T) { } for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - // Init the ring. - ringDesc := &Desc{Ingesters: testData.ringInstances} - for id, instance := range ringDesc.Ingesters { - instance.Timestamp = time.Now().Unix() - instance.State = ACTIVE - ringDesc.Ingesters[id] = instance - } - - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, - }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - instancesCountPerZone: ringDesc.instancesCountPerZone(), - strategy: NewDefaultReplicationStrategy(), - } - - shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) - assert.Equal(t, testData.expectedSize, shardRing.InstancesCount()) + for _, updateReadOnlyInstances := range []bool{false, true} { + t.Run(fmt.Sprintf("%v, updateReadOnlyInstances=%v", testName, updateReadOnlyInstances), func(t *testing.T) { + // Init the ring. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + instance.Timestamp = time.Now().Unix() + instance.State = ACTIVE + ringDesc.Ingesters[id] = instance + } - // Compute the actual distribution of instances across zones. - if testData.zoneAwarenessEnabled { - assert.Equal(t, testData.expectedZoneCount, shardRing.ZonesCount()) - for z, instances := range testData.expectedInstancesInZoneCount { - assert.Equal(t, instances, shardRing.InstancesInZoneCount(z)) + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + strategy: NewDefaultReplicationStrategy(), } - var actualDistribution []int + if updateReadOnlyInstances { + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() + ring.readOnlyInstances = &readOnlyInstances + ring.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp + } - if shardRing.InstancesCount() > 0 { - all, err := shardRing.GetAllHealthy(Read) - require.NoError(t, err) + shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) + assert.Equal(t, testData.expectedSize, shardRing.InstancesCount()) - countByZone := map[string]int{} - for _, instance := range all.Instances { - countByZone[instance.Zone]++ + // Compute the actual distribution of instances across zones. + if testData.zoneAwarenessEnabled { + assert.Equal(t, testData.expectedZoneCount, shardRing.ZonesCount()) + for z, instances := range testData.expectedInstancesInZoneCount { + assert.Equal(t, instances, shardRing.InstancesInZoneCount(z)) } - for _, count := range countByZone { - actualDistribution = append(actualDistribution, count) + var actualDistribution []int + + if shardRing.InstancesCount() > 0 { + all, err := shardRing.GetAllHealthy(Read) + require.NoError(t, err) + + countByZone := map[string]int{} + for _, instance := range all.Instances { + countByZone[instance.Zone]++ + } + + for _, count := range countByZone { + actualDistribution = append(actualDistribution, count) + } } - } - assert.ElementsMatch(t, testData.expectedDistribution, actualDistribution) - } - }) + assert.ElementsMatch(t, testData.expectedDistribution, actualDistribution) + } + }) + } } } @@ -1688,12 +1744,13 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + strategy: NewDefaultReplicationStrategy(), } // Compute the shard for each tenant. @@ -1802,12 +1859,13 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + strategy: NewDefaultReplicationStrategy(), } // Compute the initial shard for each tenant. @@ -1851,6 +1909,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { ring.ringTokensByZone = ringDesc.getTokensByZone() ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) + ring.instancesCountPerZone = ringDesc.instancesCountPerZone() // Compute the update shard for each tenant and compare it with the initial one. // If the "consistency" property is guaranteed, we expect no more then 1 different instance @@ -1882,12 +1941,13 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + strategy: NewDefaultReplicationStrategy(), } // Get the replication set with shard size = 3. @@ -1959,12 +2019,13 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + strategy: NewDefaultReplicationStrategy(), } // Get the replication set with shard size = 2. @@ -1996,6 +2057,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { ring.ringTokensByZone = ringDesc.getTokensByZone() ring.ringInstanceByToken = ringDesc.getTokensInfo() ring.ringZones = getZones(ringDesc.getTokensByZone()) + ring.instancesCountPerZone = ringDesc.instancesCountPerZone() // Increase shard size to 6. thirdShard := ring.ShuffleShard("tenant-id", 6) @@ -2032,7 +2094,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { userID = "user-1" ) - now := time.Now() + now := time.Now().Truncate(time.Second) type event struct { what eventType @@ -2040,6 +2102,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { instanceDesc InstanceDesc shardSize int expected []string + readOnly bool readOnlyTime time.Time } @@ -2054,6 +2117,14 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { {what: test, shardSize: 1, expected: []string{"instance-1", "instance-2", "instance-3"}}, }, }, + "single zone, shard size = 0, recently bootstrapped cluster": { + timeline: []event{ + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-time.Minute))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-time.Minute))}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-time.Minute))}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-2", "instance-3"}}, + }, + }, "single zone, shard size = 1, instances scale up": { timeline: []event{ {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 3}, now.Add(-2*lookbackPeriod))}, @@ -2141,20 +2212,72 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { {what: test, shardSize: 4, expected: []string{"instance-1", "instance-2", "instance-3"}}, }, }, - "single zone, with read only instance, within lookback": { + "single zone, with read only instance, within lookback, shardSize=2": { timeline: []event{ // instance 2 is included in addition to another instance {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, - {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnlyTime: now.Add(lookbackPeriod / 2)}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod / 2)}, {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, {what: test, shardSize: 2, expected: []string{"instance-1", "instance-2", "instance-3"}}, }, }, - "single zone, with read only instance, not within lookback": { + "single zone, with read only instance, not within lookback, shardSize=2": { timeline: []event{ // readOnlyTime is too old to matter {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, - {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnlyTime: now.Add(-2 * lookbackPeriod)}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(-2 * lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 2, expected: []string{"instance-1", "instance-3"}}, + }, + }, + "single zone, with read only instance, at lookback boundary, shardSize=2": { + timeline: []event{ + // readOnlyTime is too old to matter + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 2, expected: []string{"instance-1", "instance-2", "instance-3"}}, + }, + }, + "single zone, with read only instance, with unknown ReadOnlyUpdatedTimestamp, shardSize=2": { + timeline: []event{ + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 2, expected: []string{"instance-1", "instance-2", "instance-3"}}, + }, + }, + "single zone, with read only instance, within lookback, shardSize=0": { + timeline: []event{ + // instance 2 is included in addition to another instance + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod / 2)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-2", "instance-3"}}, + }, + }, + "single zone, with read only instance, not within lookback, shardSize=0": { + timeline: []event{ + // readOnlyTime is too old to matter + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(-2 * lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-3"}}, + }, + }, + "single zone, with read only instance, at lookback boundary, shardSize=0": { + timeline: []event{ + // readOnlyTime is too old to matter + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-2", "instance-3"}}, + }, + }, + "single zone, with read only instance, with unknown ReadOnlyUpdatedTimestamp, shardSize=0": { + timeline: []event{ + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{userToken(userID, "zone-a", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true}, {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-a", []uint32{userToken(userID, "zone-a", 2) + 1}, now.Add(-2*lookbackPeriod))}, {what: test, shardSize: 2, expected: []string{"instance-1", "instance-2", "instance-3"}}, }, @@ -2255,84 +2378,166 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { {what: test, shardSize: 3, expected: []string{"instance-1", "instance-2", "instance-3", "instance-4", "instance-5", "instance-6"}}, }, }, - "multi zone, with read only instance, within lookback": { + "multi zone, with read only instance, within lookback, shardSize=3": { timeline: []event{ {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, // instance 2 and 4 are included in addition to other instances in the same zone - {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnlyTime: now.Add(lookbackPeriod / 2)}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod / 2)}, {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, - {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod)), readOnlyTime: now.Add(lookbackPeriod / 2)}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod / 2)}, {what: test, shardSize: 3, expected: []string{"instance-1", "instance-2", "instance-3", "instance-4", "instance-6"}}, }, }, - "multi zone, with read only instance, not within lookback": { + "multi zone, with read only instance, at lookback boundary, shardSize=3": { timeline: []event{ // readOnlyTime is too old to matter {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, - {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnlyTime: now.Add(-2 * lookbackPeriod)}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 3, expected: []string{"instance-1", "instance-2", "instance-3", "instance-6"}}, + }, + }, + "multi zone, with read only instance, not within lookback, shardSize=3": { + timeline: []event{ + // readOnlyTime is too old to matter + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(-2 * lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 3, expected: []string{"instance-1", "instance-3", "instance-6"}}, + }, + }, + "multi zone, with read only instance, with unknown ReadOnlyUpdatedTimestamp, shardSize=3": { + timeline: []event{ + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + // instance 2 and 4 are included in addition to other instances in the same zone + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true}, {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, - {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod)), readOnlyTime: now.Add(-2 * lookbackPeriod)}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true}, {what: test, shardSize: 3, expected: []string{"instance-1", "instance-2", "instance-3", "instance-4", "instance-6"}}, }, }, + "multi zone, with read only instance, within lookback, shardSize=0": { + timeline: []event{ + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod / 2)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod / 2)}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-2", "instance-3", "instance-4", "instance-5", "instance-6"}}, + }, + }, + "multi zone, with read only instance, at lookback boundary, shardSize=0": { + timeline: []event{ + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-2", "instance-3", "instance-4", "instance-5", "instance-6"}}, + }, + }, + "multi zone, with read only instance, not within lookback, shardSize=0": { + timeline: []event{ + // readOnlyTime is too old to matter + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true, readOnlyTime: now.Add(-2 * lookbackPeriod)}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod))}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-3", "instance-4", "instance-5", "instance-6"}}, + }, + }, + "multi zone, with read only instance, with unknown ReadOnlyUpdatedTimestamp, shardSize=0": { + timeline: []event{ + {what: add, instanceID: "instance-1", instanceDesc: generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{userToken(userID, "zone-a", 0) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-2", instanceDesc: generateRingInstanceWithInfo("instance-2", "zone-b", []uint32{userToken(userID, "zone-b", 1) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true}, + {what: add, instanceID: "instance-3", instanceDesc: generateRingInstanceWithInfo("instance-3", "zone-b", []uint32{userToken(userID, "zone-b", 2) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-4", instanceDesc: generateRingInstanceWithInfo("instance-4", "zone-c", []uint32{userToken(userID, "zone-c", 3) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-5", instanceDesc: generateRingInstanceWithInfo("instance-5", "zone-c", []uint32{userToken(userID, "zone-c", 4) + 1}, now.Add(-2*lookbackPeriod))}, + {what: add, instanceID: "instance-6", instanceDesc: generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 5) + 1}, now.Add(-2*lookbackPeriod)), readOnly: true}, + {what: test, shardSize: 0, expected: []string{"instance-1", "instance-2", "instance-3", "instance-4", "instance-5", "instance-6"}}, + }, + }, } - for _, updateRegisteredTimestampCache := range []bool{false, true} { - for testName, testData := range tests { - t.Run(fmt.Sprintf("%s/%v", testName, updateRegisteredTimestampCache), func(t *testing.T) { - // Initialise the ring. - ringDesc := &Desc{Ingesters: map[string]InstanceDesc{}} - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - ZoneAwarenessEnabled: true, - }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), - } + for testName, testData := range tests { + for _, updateRegisteredTimestampCache := range []bool{false, true} { + for _, updateReadOnlyInstances := range []bool{false, true} { + t.Run(fmt.Sprintf("%s/%v/%v", testName, updateRegisteredTimestampCache, updateReadOnlyInstances), func(t *testing.T) { + // Initialise the ring. + ringDesc := &Desc{Ingesters: map[string]InstanceDesc{}} + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + strategy: NewDefaultReplicationStrategy(), + } - // Replay the events on the timeline. - for ix, event := range testData.timeline { - switch event.what { - case add: - if !event.readOnlyTime.IsZero() { - event.instanceDesc.ReadOnly = true + // Replay the events on the timeline. + for ix, event := range testData.timeline { + switch event.what { + case add: + event.instanceDesc.ReadOnly = event.readOnly event.instanceDesc.ReadOnlyUpdatedTimestamp = timeToUnixSecons(event.readOnlyTime) + ringDesc.Ingesters[event.instanceID] = event.instanceDesc + + ring.ringTokens = ringDesc.GetTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + ring.instancesCountPerZone = ringDesc.instancesCountPerZone() + if updateRegisteredTimestampCache { + ring.oldestRegisteredTimestamp = ringDesc.getOldestRegisteredTimestamp() + } + if updateReadOnlyInstances { + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() + ring.readOnlyInstances = &readOnlyInstances + ring.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp + } + case remove: + delete(ringDesc.Ingesters, event.instanceID) + + ring.ringTokens = ringDesc.GetTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + ring.instancesCountPerZone = ringDesc.instancesCountPerZone() + if updateRegisteredTimestampCache { + ring.oldestRegisteredTimestamp = ringDesc.getOldestRegisteredTimestamp() + } + if updateReadOnlyInstances { + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() + ring.readOnlyInstances = &readOnlyInstances + ring.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp + } + case test: + rs, err := ring.ShuffleShardWithLookback(userID, event.shardSize, lookbackPeriod, now).GetAllHealthy(Read) + require.NoError(t, err) + assert.ElementsMatch(t, event.expected, rs.GetAddresses(), "step %d", ix) } - ringDesc.Ingesters[event.instanceID] = event.instanceDesc - - ring.ringTokens = ringDesc.GetTokens() - ring.ringTokensByZone = ringDesc.getTokensByZone() - ring.ringInstanceByToken = ringDesc.getTokensInfo() - ring.ringZones = getZones(ringDesc.getTokensByZone()) - if updateRegisteredTimestampCache { - ring.oldestRegisteredTimestamp = ringDesc.getOldestRegisteredTimestamp() - } - case remove: - delete(ringDesc.Ingesters, event.instanceID) - - ring.ringTokens = ringDesc.GetTokens() - ring.ringTokensByZone = ringDesc.getTokensByZone() - ring.ringInstanceByToken = ringDesc.getTokensInfo() - ring.ringZones = getZones(ringDesc.getTokensByZone()) - if updateRegisteredTimestampCache { - ring.oldestRegisteredTimestamp = ringDesc.getOldestRegisteredTimestamp() - } - case test: - rs, err := ring.ShuffleShardWithLookback(userID, event.shardSize, lookbackPeriod, now).GetAllHealthy(Read) - require.NoError(t, err) - assert.ElementsMatch(t, event.expected, rs.GetAddresses(), "step %d", ix) } - } - }) + }) + } } } } @@ -2354,181 +2559,194 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { for _, updateOldestRegisteredTimestamp := range []bool{false, true} { updateOldestRegisteredTimestamp := updateOldestRegisteredTimestamp - for _, numInstances := range numInitialInstances { - numInstances := numInstances + for _, updateReadOnlyInstances := range []bool{false, true} { + updateReadOnlyInstances := updateReadOnlyInstances - for _, numZones := range numInitialZones { - numZones := numZones + for _, numInstances := range numInitialInstances { + numInstances := numInstances - testName := fmt.Sprintf("num instances = %d, num zones = %d, update oldest registered timestamp = %v", numInstances, numZones, updateOldestRegisteredTimestamp) + for _, numZones := range numInitialZones { + numZones := numZones - t.Run(testName, func(t *testing.T) { - t.Parallel() + testName := fmt.Sprintf("num instances = %d, num zones = %d, update oldest registered timestamp = %v, update read only instances = %v", numInstances, numZones, updateOldestRegisteredTimestamp, updateReadOnlyInstances) - // Randomise the seed but log it in case we need to reproduce the test on failure. - seed := time.Now().UnixNano() - rnd := rand.New(rand.NewSource(seed)) - t.Log("random generator seed:", seed) - gen := NewRandomTokenGeneratorWithSeed(seed) + t.Run(testName, func(t *testing.T) { + t.Parallel() - // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(gen, numInstances, numZones, 128)} - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - ZoneAwarenessEnabled: true, - ReplicationFactor: 3, - }, - ringDesc: ringDesc, - strategy: NewDefaultReplicationStrategy(), - } - updateRing := func() { - ring.ringTokens = ringDesc.GetTokens() - ring.ringTokensByZone = ringDesc.getTokensByZone() - ring.ringInstanceByToken = ringDesc.getTokensInfo() - ring.ringZones = getZones(ringDesc.getTokensByZone()) - ring.instancesCountPerZone = ringDesc.instancesCountPerZone() - if updateOldestRegisteredTimestamp { - ring.oldestRegisteredTimestamp = ringDesc.getOldestRegisteredTimestamp() + // Randomise the seed but log it in case we need to reproduce the test on failure. + seed := time.Now().UnixNano() + rnd := rand.New(rand.NewSource(seed)) + t.Log("random generator seed:", seed) + gen := NewRandomTokenGeneratorWithSeed(seed) + + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(gen, numInstances, numZones, 128)} + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + ReplicationFactor: 3, + }, + ringDesc: ringDesc, + strategy: NewDefaultReplicationStrategy(), } + updateRing := func() { + ring.ringTokens = ringDesc.GetTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + ring.instancesCountPerZone = ringDesc.instancesCountPerZone() + if updateOldestRegisteredTimestamp { + ring.oldestRegisteredTimestamp = ringDesc.getOldestRegisteredTimestamp() + } + if updateReadOnlyInstances { + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() + ring.readOnlyInstances = &readOnlyInstances + ring.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp + } - if len(ring.ringZones) != numZones { - t.Fatalf("number of zones changed, original=%d, current zones=%v", numZones, ring.ringZones) + if len(ring.ringZones) != numZones { + t.Fatalf("number of zones changed, original=%d, current zones=%v", numZones, ring.ringZones) + } } - } - updateRing() + updateRing() - // The simulation starts with the minimum shard size. Random events can later increase it. - shardSize := numZones + // The simulation starts with the minimum shard size. Random events can later increase it. + shardSize := numZones - // The simulation assumes the initial ring contains instances registered - // since more than the lookback period. - currTime := time.Now().Add(lookbackPeriod).Add(time.Minute) + // The simulation assumes the initial ring contains instances registered + // since more than the lookback period. + currTime := time.Now().Add(lookbackPeriod).Add(time.Minute) - // Add the initial shard to the history. - rs, err := ring.shuffleShard(userID, shardSize, 0, currTime).GetReplicationSetForOperation(Read) - require.NoError(t, err) + // Add the initial shard to the history. + rs, err := ring.shuffleShard(userID, shardSize, 0, currTime).GetReplicationSetForOperation(Read) + require.NoError(t, err) - type historyEntry struct { - ReplicationSet - shardSize int - time.Time - } - // events, indexed by event id. - history := map[int]historyEntry{ - 0: {rs, shardSize, currTime}, - } + type historyEntry struct { + ReplicationSet + shardSize int + time.Time + } + // events, indexed by event id. + history := map[int]historyEntry{ + 0: {rs, shardSize, currTime}, + } - // Track instances that have been marked as read-only - readOnlyInstances := make(map[string]InstanceDesc) - - // Simulate a progression of random events over the time and, at each iteration of the simulation, - // make sure the subring includes all non-removed instances picked from previous versions of the - // ring up until the lookback period. - nextInstanceID := len(ringDesc.Ingesters) + 1 - - for i := 1; i <= numEvents; i++ { - currTime = currTime.Add(delayBetweenEvents) - - switch r := rnd.Intn(100); { - case r < 60: - // Scale up instances by 1. - instanceID := fmt.Sprintf("instance-%d", nextInstanceID) - zoneID := fmt.Sprintf("zone-%d", nextInstanceID%numZones) - nextInstanceID++ - ringDesc.Ingesters[instanceID] = generateRingInstanceWithInfo(instanceID, zoneID, gen.GenerateTokens(128, ringDesc.GetTokens()), currTime) - updateRing() - t.Logf("%d: added instance %s", i, instanceID) - - case r < 70: - // Scale down instances by 1. - idToRemove := getRandomInstanceID(ringDesc.Ingesters, rnd) - zone := ringDesc.Ingesters[idToRemove].Zone - // Don't remove instance if it is the last instance in the zone, - // because sharding works differently for different number of zones. - if ring.instancesCountPerZone[zone] <= 1 { - t.Logf("%d: not removing last instance %s from zone %s", i, idToRemove, zone) - break - } + // Track instances that have been marked as read-only + readOnlyInstances := make(map[string]InstanceDesc) + + // Simulate a progression of random events over the time and, at each iteration of the simulation, + // make sure the subring includes all non-removed instances picked from previous versions of the + // ring up until the lookback period. + nextInstanceID := len(ringDesc.Ingesters) + 1 + + for eventID := 1; eventID <= numEvents; eventID++ { + currTime = currTime.Add(delayBetweenEvents) + + switch r := rnd.Intn(100); { + case r < 60: + // Scale up instances by 1. + instanceID := fmt.Sprintf("instance-%d", nextInstanceID) + zoneID := fmt.Sprintf("zone-%d", nextInstanceID%numZones) + nextInstanceID++ + ringDesc.Ingesters[instanceID] = generateRingInstanceWithInfo(instanceID, zoneID, gen.GenerateTokens(128, ringDesc.GetTokens()), currTime) + updateRing() + t.Logf("%d (%v): added instance %s, total instances %d", eventID, currTime.Format("03:04"), instanceID, len(ringDesc.Ingesters)) + + case r < 70: + // Scale down instances by 1. + idToRemove := getRandomInstanceID(ringDesc.Ingesters, rnd) + zone := ringDesc.Ingesters[idToRemove].Zone + // Don't remove instance if it is the last instance in the zone, + // because sharding works differently for different number of zones. + if ring.instancesCountPerZone[zone] <= 1 { + t.Logf("%d (%v): not removing last instance %s from zone %s", eventID, currTime.Format("03:04"), idToRemove, zone) + break + } - delete(ringDesc.Ingesters, idToRemove) - updateRing() - t.Logf("%d: removed instance %s", i, idToRemove) - - // Remove the terminated instance from the history. - for ringTime, ringState := range history { - for idx, desc := range ringState.Instances { - // In this simulation instance ID == instance address. - if desc.Addr == idToRemove { - ringState.Instances = append(ringState.Instances[:idx], ringState.Instances[idx+1:]...) - history[ringTime] = ringState - break + delete(ringDesc.Ingesters, idToRemove) + updateRing() + t.Logf("%d (%v): removed instance %s, total instances %d", eventID, currTime.Format("03:04"), idToRemove, len(ringDesc.Ingesters)) + + // Remove the terminated instance from the history. + for eid, ringState := range history { + for idx, desc := range ringState.Instances { + // In this simulation instance ID == instance address. + if desc.Addr == idToRemove { + ringState.Instances = append(ringState.Instances[:idx], ringState.Instances[idx+1:]...) + history[eid] = ringState + break + } } } - } - // Removed instance can't be read-only. - delete(readOnlyInstances, idToRemove) - - case r < 80: - // Set an instance to read only - instanceID := getRandomInstanceID(ringDesc.Ingesters, rnd) - instanceDesc := ringDesc.Ingesters[instanceID] - if !instanceDesc.ReadOnly { - instanceDesc.ReadOnly = true - instanceDesc.ReadOnlyUpdatedTimestamp = currTime.Unix() - ringDesc.Ingesters[instanceID] = instanceDesc - readOnlyInstances[instanceID] = instanceDesc - t.Logf("%d: switched instance %s to read-only", i, instanceID) - } else { - t.Logf("%d: instance %s is already read-only, not switching", i, instanceID) - } + // Removed instance can't be read-only. + delete(readOnlyInstances, idToRemove) - case r < 90: - // Set a read-only instance back to read-write - if len(readOnlyInstances) > 0 { - instanceID := getRandomInstanceID(readOnlyInstances, rnd) + case r < 80: + // Set an instance to read only + instanceID := getRandomInstanceID(ringDesc.Ingesters, rnd) instanceDesc := ringDesc.Ingesters[instanceID] - instanceDesc.ReadOnly = false - instanceDesc.ReadOnlyUpdatedTimestamp = currTime.Unix() - ringDesc.Ingesters[instanceID] = instanceDesc - delete(readOnlyInstances, instanceID) - t.Logf("%d: switched instance %s to read-write", i, instanceID) - } else { - t.Logf("%d: no instance to switch to read-only found", i) - } - default: - // Scale up shard size (keeping the per-zone balance). - shardSize += numZones - t.Logf("%d: increased shard size %d", i, shardSize) - } - - // Add the current shard to the history. - rs, err = ring.shuffleShard(userID, shardSize, 0, currTime).GetReplicationSetForOperation(Read) - require.NoError(t, err) - history[i] = historyEntry{rs, shardSize, currTime} - - // Ensure the shard with lookback includes all instances from previous states of the ring. - rsWithLookback, err := ring.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, currTime).GetReplicationSetForOperation(Read) - require.NoError(t, err) - t.Logf("%d: history for event=%v, shardSize=%v, lookbackPeriod=%v,\nrs=%v,\nrsWithLookback=%v", i, currTime.Format("03:04"), shardSize, lookbackPeriod, getSortedAddresses(rs), getSortedAddresses(rsWithLookback)) + if !instanceDesc.ReadOnly { + instanceDesc.ReadOnly = true + instanceDesc.ReadOnlyUpdatedTimestamp = currTime.Unix() + ringDesc.Ingesters[instanceID] = instanceDesc + updateRing() + + readOnlyInstances[instanceID] = instanceDesc + t.Logf("%d (%v): switched instance %s to read-only", eventID, currTime.Format("03:04"), instanceID) + } else { + t.Logf("%d (%v): instance %s is already read-only, not switching", eventID, currTime.Format("03:04"), instanceID) + } - for ix, ringState := range history { - if ringState.Time.Before(currTime.Add(-lookbackPeriod)) { - // This entry from the history is obsolete, we can remove it. - delete(history, ix) - continue + case r < 90: + // Set a read-only instance back to read-write + if len(readOnlyInstances) > 0 { + instanceID := getRandomInstanceID(readOnlyInstances, rnd) + instanceDesc := ringDesc.Ingesters[instanceID] + instanceDesc.ReadOnly = false + instanceDesc.ReadOnlyUpdatedTimestamp = currTime.Unix() + ringDesc.Ingesters[instanceID] = instanceDesc + updateRing() + + delete(readOnlyInstances, instanceID) + t.Logf("%d (%v): switched instance %s to read-write", eventID, currTime.Format("03:04"), instanceID) + } else { + t.Logf("%d (%v): no instance to switch to read-only found", eventID, currTime.Format("03:04")) + } + default: + // Scale up shard size (keeping the per-zone balance). + shardSize += numZones + t.Logf("%d (%v): increased shard size %d", eventID, currTime.Format("03:04"), shardSize) } - for _, desc := range ringState.Instances { - if !rsWithLookback.Includes(desc.Addr) && !desc.ReadOnly { - t.Fatalf("subring generated after event %d %v is expected to include instance %s from ring state but it's missing (actual instances are: %s)", - ix, ringState.Time.Format("03:04"), desc.Addr, strings.Join(rsWithLookback.GetAddresses(), ", ")) + // Add the current shard to the history. + rs, err = ring.shuffleShard(userID, shardSize, 0, currTime).GetReplicationSetForOperation(Read) + require.NoError(t, err) + history[eventID] = historyEntry{rs, shardSize, currTime} + + // Ensure the shard with lookback includes all instances from previous states of the ring. + rsWithLookback, err := ring.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, currTime).GetReplicationSetForOperation(Read) + require.NoError(t, err) + t.Logf("%d: subrings for event=%v, shardSize=%v, lookbackPeriod=%v,\nrs=%v,\nrsWithLookback=%v", eventID, currTime.Format("03:04"), shardSize, lookbackPeriod, getSortedAddresses(rs), getSortedAddresses(rsWithLookback)) + + for ix, ringState := range history { + if ringState.Time.Before(currTime.Add(-lookbackPeriod)) { + // This entry from the history is obsolete, we can remove it. + delete(history, ix) + continue + } + + for _, desc := range ringState.Instances { + if !rsWithLookback.Includes(desc.Addr) && !desc.ReadOnly { + t.Fatalf("%d (%v) new shuffle shard with lookback is expected to include instance %s from ring state after event %d but it's missing (actual instances are: %v)", + eventID, currTime.Format("03:04"), desc.Addr, ix, getSortedAddresses(rsWithLookback)) + } } } } - } - }) + }) + } } } } @@ -3008,6 +3226,12 @@ func TestRing_ShuffleShardWithLookback_CachingAfterTopologyChange(t *testing.T) require.Equal(t, 1, second.InstancesInZoneCount("zone-c")) } +func makeReadOnly(desc InstanceDesc, ts time.Time) InstanceDesc { + desc.ReadOnly = true + desc.ReadOnlyUpdatedTimestamp = ts.Unix() + return desc +} + func TestRing_ShuffleShardWithLookback_CachingAfterReadOnlyChange(t *testing.T) { cfg := Config{KVStore: kv.Config{}, ReplicationFactor: 1, ZoneAwarenessEnabled: true} registry := prometheus.NewRegistry() @@ -3027,11 +3251,6 @@ func TestRing_ShuffleShardWithLookback_CachingAfterReadOnlyChange(t *testing.T) "instance-6": generateRingInstanceWithInfo("instance-6", "zone-c", []uint32{userToken(userID, "zone-c", 1) + 1}, now.Add(-2*time.Hour)), } } - makeReadOnly := func(desc InstanceDesc) InstanceDesc { - desc.ReadOnly = true - desc.ReadOnlyUpdatedTimestamp = time.Now().Unix() - return desc - } initialRingDesc := &Desc{Ingesters: makeInstances()} ring.updateRingState(initialRingDesc) @@ -3048,8 +3267,8 @@ func TestRing_ShuffleShardWithLookback_CachingAfterReadOnlyChange(t *testing.T) require.Equal(t, 1, first.InstancesInZoneCount("zone-c")) updatedInstances := makeInstances() - updatedInstances["instance-1"] = makeReadOnly(updatedInstances["instance-1"]) - updatedInstances["instance-5"] = makeReadOnly(updatedInstances["instance-5"]) + updatedInstances["instance-1"] = makeReadOnly(updatedInstances["instance-1"], now) + updatedInstances["instance-5"] = makeReadOnly(updatedInstances["instance-5"], now) updatedRingDesc := &Desc{Ingesters: updatedInstances} ring.updateRingState(updatedRingDesc) @@ -3155,46 +3374,50 @@ func TestRing_ShuffleShardWithLookback_CachingConcurrency(t *testing.T) { // Add some instances to the ring. ringDesc := &Desc{Ingesters: map[string]InstanceDesc{ "instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), - "instance-2": generateRingInstanceWithInfo("instance-2", "zone-a", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), + "instance-2": makeReadOnly(generateRingInstanceWithInfo("instance-2", "zone-a", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), now), "instance-3": generateRingInstanceWithInfo("instance-3", "zone-b", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), "instance-4": generateRingInstanceWithInfo("instance-4", "zone-b", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), "instance-5": generateRingInstanceWithInfo("instance-5", "zone-c", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), - "instance-6": generateRingInstanceWithInfo("instance-6", "zone-c", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), + "instance-6": makeReadOnly(generateRingInstanceWithInfo("instance-6", "zone-c", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), now.Add(-2*time.Hour)), }} ring.updateRingState(ringDesc) - // Start the workers. - wg := sync.WaitGroup{} - wg.Add(numWorkers) + for _, shardSize := range []int{3, 0} { + t.Run(fmt.Sprintf("shardSize=%d", shardSize), func(t *testing.T) { + // Start the workers. + wg := sync.WaitGroup{} + wg.Add(numWorkers) - for w := 0; w < numWorkers; w++ { - go func(workerID int) { - defer wg.Done() + for w := 0; w < numWorkers; w++ { + go func(workerID int) { + defer wg.Done() - // Get the subring once. This is the one expected from subsequent requests. - userID := fmt.Sprintf("user-%d", workerID) - expected := ring.ShuffleShardWithLookback(userID, 3, time.Hour, now) + // Get the subring once. This is the one expected from subsequent requests. + userID := fmt.Sprintf("user-%d", workerID) + expected := ring.ShuffleShardWithLookback(userID, shardSize, time.Hour, now) - for r := 0; r < numRequestsPerWorker; r++ { - actual := ring.ShuffleShardWithLookback(userID, 3, time.Hour, now) - require.Equal(t, expected, actual) + for r := 0; r < numRequestsPerWorker; r++ { + actual := ring.ShuffleShardWithLookback(userID, shardSize, time.Hour, now) + require.Equal(t, expected, actual) - // Get the subring for a new user each time too, in order to stress the setter too - // (if we only read from the cache there's no read/write concurrent access). - ring.ShuffleShardWithLookback(fmt.Sprintf("stress-%d", r), 3, time.Hour, now) + // Get the subring for a new user each time too, in order to stress the setter too + // (if we only read from the cache there's no read/write concurrent access). + ring.ShuffleShardWithLookback(fmt.Sprintf("stress-%d", r), shardSize, time.Hour, now) + } + }(w) } - }(w) - } - // Wait until all workers have done. - wg.Wait() + // Wait until all workers have done. + wg.Wait() + }) + } } func BenchmarkRing_ShuffleShard(b *testing.B) { for _, numInstances := range []int{50, 100, 1000} { for _, numZones := range []int{1, 3} { - for _, shardSize := range []int{0, 3, 10, 30} { + for _, shardSize := range []int{0, 3, 10, 30, 100, 1000} { b.Run(fmt.Sprintf("num instances = %d, num zones = %d, shard size = %d", numInstances, numZones, shardSize), func(b *testing.B) { benchmarkShuffleSharding(b, numInstances, numZones, 128, shardSize, false) }) @@ -3206,7 +3429,7 @@ func BenchmarkRing_ShuffleShard(b *testing.B) { func BenchmarkRing_ShuffleShardCached(b *testing.B) { for _, numInstances := range []int{50, 100, 1000} { for _, numZones := range []int{1, 3} { - for _, shardSize := range []int{0, 3, 10, 30} { + for _, shardSize := range []int{0, 3, 10, 30, 100, 1000} { b.Run(fmt.Sprintf("num instances = %d, num zones = %d, shard size = %d", numInstances, numZones, shardSize), func(b *testing.B) { benchmarkShuffleSharding(b, numInstances, numZones, 128, shardSize, true) }) @@ -3255,16 +3478,20 @@ func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, s // Initialise the ring. ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(b), numInstances, numZones, numTokens)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: !cache}, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), - lastTopologyChange: time.Now(), + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: !cache}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), } + readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() + ring.readOnlyInstances = &readOnlyInstances + ring.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp b.ResetTimer() @@ -3311,14 +3538,15 @@ func BenchmarkRing_Get(b *testing.B) { SubringCacheDisabled: true, ReplicationFactor: benchCase.replicationFactor, }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), - lastTopologyChange: time.Now(), + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), } buf, bufHosts, bufZones := MakeBuffersForGet() @@ -3343,15 +3571,16 @@ func TestRing_Get_NoMemoryAllocations(t *testing.T) { // Initialise the ring. ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), 3, 3, 128)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: 3}, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), - lastTopologyChange: time.Now(), + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: 3}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + instancesCountPerZone: ringDesc.instancesCountPerZone(), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), } buf, bufHosts, bufZones := MakeBuffersForGet()