Skip to content

Commit

Permalink
Moved instance-filtering back to shuffleShard, added instance filteri…
Browse files Browse the repository at this point in the history
…ng when number of desired instances is >= number of instances in the zone.
  • Loading branch information
pstibrany committed Aug 22, 2024
1 parent a59aa92 commit 94800a1
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 113 deletions.
95 changes: 48 additions & 47 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,13 +709,11 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
return cached
}

var result *Ring
if size <= 0 {
result = r.filterOutReadOnlyInstances(0, time.Now())
} else {
result = r.shuffleShard(identifier, size, 0, time.Now())
size = math.MaxInt
}
result := r.shuffleShard(identifier, size, 0, time.Now())

// Only cache subring if it is different from this ring, to avoid deadlocks in getCachedShuffledSubring,
// when we update the cached ring.
if result != r {
Expand All @@ -740,13 +738,10 @@ func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPer
return cached
}

var result *Ring
if size <= 0 {
result = r.filterOutReadOnlyInstances(lookbackPeriod, now)
} else {
result = r.shuffleShard(identifier, size, lookbackPeriod, now)
size = math.MaxInt
}

result := r.shuffleShard(identifier, size, lookbackPeriod, now)
if result != r {
r.setCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now, result)
}
Expand All @@ -766,30 +761,66 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
//
// If any instance had RegisteredTimestamp equal to 0 (it would not cause additional lookup of next instance),
// then r.oldestRegisteredTimestamp is zero too, and we skip this optimization.
//
// Even if some instances are read-only, they must have changed their read-only status within lookback window
// (because they were all registered within lookback window), so they would be included in the result.
if lookbackPeriod > 0 && r.oldestRegisteredTimestamp > 0 && r.oldestRegisteredTimestamp >= lookbackUntil {
// Even if some ingesters are read only, they must have changed their read-only status within lookback window
// (because they were all registered within lookback window), so they would be included.
return r
}

var numInstancesPerZone int
var actualZones []string

shard := make(map[string]InstanceDesc, min(len(r.ringDesc.Ingesters), size))

if r.cfg.ZoneAwarenessEnabled {
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones))
actualZones = r.ringZones
} else {
numInstancesPerZone = size
actualZones = []string{""}
}

shard := make(map[string]InstanceDesc, min(len(r.ringDesc.Ingesters), size))
// If we're including all instances from the ring, we can simply filter out unwanted instances, and avoid
// iterating through tokens.
if numInstancesPerZone >= len(r.ringDesc.Ingesters) {
// If there are no read-only instances, there's no need to do any filtering.
if r.readOnlyInstances != nil && *r.readOnlyInstances == 0 {
return r
}

// If all readOnlyUpdatedTimestamp values are within lookback window, we can return the ring without any filtering.
if lookbackPeriod > 0 && r.oldestReadOnlyUpdatedTimestamp != nil && *r.oldestReadOnlyUpdatedTimestamp >= lookbackUntil {
return r
}

for id, inst := range r.ringDesc.Ingesters {
if shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) {
shard[id] = inst
}
}

return r.buildRingForTheShard(shard)
}
}

// We need to iterate zones always in the same order to guarantee stability.
for _, zone := range actualZones {
var tokens []uint32

if r.cfg.ZoneAwarenessEnabled {
// If we're going to include all instances from this zone, we can simply filter out
// unwanted instances, and avoid iterating through tokens.
if numInstancesPerZone >= r.instancesCountPerZone[zone] {
for id, inst := range r.ringDesc.Ingesters {
if inst.Zone == zone && shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) {
shard[id] = inst
}
}

// We can go to the next zone, no need to iterate tokens.
continue
}

tokens = r.ringTokensByZone[zone]
} else {
// When zone-awareness is disabled, we just iterate over 1 single fake zone
Expand Down Expand Up @@ -865,7 +896,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
}
}

return r.buildReadOnlyRingForTheShard(shard)
return r.buildRingForTheShard(shard)
}

// shouldIncludeReadonlyInstanceInTheShard returns true if instance is not read-only, or when it is read-only and should be included in the shuffle shard.
Expand All @@ -885,38 +916,8 @@ func shouldIncludeReadonlyInstanceInTheShard(instance InstanceDesc, lookbackPeri
return true
}

// filterOutReadOnlyInstances removes all read-only instances from the ring, and returns the resulting ring.
// When lookback period > 0, only read-only instances that have changed state within outside of the lookback window are filtered,
// to be consistent with shuffleShard function.
func (r *Ring) filterOutReadOnlyInstances(lookbackPeriod time.Duration, now time.Time) *Ring {
lookbackUntil := now.Add(-lookbackPeriod).Unix()

r.mtx.RLock()
defer r.mtx.RUnlock()

// If there are no read-only instances, there's no need to do any filtering.
if r.readOnlyInstances != nil && *r.readOnlyInstances == 0 {
return r
}

// If all readOnlyUpdatedTimestamp values are within lookback window, we can return the ring without any filtering.
if lookbackPeriod > 0 && r.oldestReadOnlyUpdatedTimestamp != nil && *r.oldestReadOnlyUpdatedTimestamp >= lookbackUntil {
return r
}

shard := make(map[string]InstanceDesc, len(r.ringDesc.Ingesters))

for id, inst := range r.ringDesc.Ingesters {
if shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) {
shard[id] = inst
}
}

return r.buildReadOnlyRingForTheShard(shard)
}

// buildReadOnlyRingForTheShard builds read-only ring for the shard (this ring won't be updated in the future).
func (r *Ring) buildReadOnlyRingForTheShard(shard map[string]InstanceDesc) *Ring {
// buildRingForTheShard builds read-only ring for the shard (this ring won't be updated in the future).
func (r *Ring) buildRingForTheShard(shard map[string]InstanceDesc) *Ring {
shardDesc := &Desc{Ingesters: shard}
shardTokensByZone := shardDesc.getTokensByZone()
shardTokens := mergeTokenGroups(shardTokensByZone)
Expand Down
Loading

0 comments on commit 94800a1

Please sign in to comment.