Skip to content

Commit

Permalink
Filter out read-only instances, but skip filtering when it's not requ…
Browse files Browse the repository at this point in the history
…ired. (#565)

* Filter out read-only instances, but skip filtering when it's not required.

---------

Co-authored-by: Jonathan Halterman <jonathan@grafana.com>
  • Loading branch information
pstibrany and jhalterman authored Aug 23, 2024
1 parent f25f206 commit 70f84e5
Show file tree
Hide file tree
Showing 3 changed files with 687 additions and 362 deletions.
23 changes: 23 additions & 0 deletions ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,29 @@ func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int {
return instancesCountPerZone
}

func (d *Desc) readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp() (int, int64) {
readOnlyInstances := 0
oldestReadOnlyUpdatedTimestamp := int64(0)
first := true

if d != nil {
for _, ingester := range d.Ingesters {
if !ingester.ReadOnly {
continue
}

readOnlyInstances++
if first {
oldestReadOnlyUpdatedTimestamp = ingester.ReadOnlyUpdatedTimestamp
} else {
oldestReadOnlyUpdatedTimestamp = min(oldestReadOnlyUpdatedTimestamp, ingester.ReadOnlyUpdatedTimestamp)
}
first = false
}
}
return readOnlyInstances, oldestReadOnlyUpdatedTimestamp
}

type CompareResult int

// CompareResult responses
Expand Down
109 changes: 91 additions & 18 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ type Ring struct {
// then this value will be 0.
oldestRegisteredTimestamp int64

readOnlyInstances *int // Number of instances with ReadOnly flag set. Only valid if not nil.
// Oldest value of ReadOnlyUpdatedTimestamp for read-only instances. If there are no read-only instances,
// or if any read-only instance has ReadOnlyUpdatedTimestamp == 0 (which should not happen), then this value will be 0.
// Only valid if not nil.
oldestReadOnlyUpdatedTimestamp *int64

// Maps a token with the information of the instance holding it. This map is immutable and
// cannot be changed in place because it's shared "as is" between subrings (the only way to
// change it is to create a new one and replace it).
Expand Down Expand Up @@ -372,6 +378,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone()
writableInstancesWithTokensCount := ringDesc.writableInstancesWithTokensCount()
writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone()
readOnlyInstances, oldestReadOnlyUpdatedTimestamp := ringDesc.readOnlyInstancesAndOldestReadOnlyUpdatedTimestamp()

r.mtx.Lock()
defer r.mtx.Unlock()
Expand All @@ -387,6 +394,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone
r.oldestRegisteredTimestamp = oldestRegisteredTimestamp
r.lastTopologyChange = now
r.readOnlyInstances = &readOnlyInstances
r.oldestReadOnlyUpdatedTimestamp = &oldestReadOnlyUpdatedTimestamp

// Invalidate all cached subrings.
if r.shuffledSubringCache != nil {
Expand Down Expand Up @@ -697,18 +706,16 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
//
// Subring returned by this method does not contain instances that have read-only field set.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// Use all possible instances if shuffle sharding is disabled. We don't set size to r.InstancesCount(), because
// that could lead to not all instances being returned when ring zones are unbalanced.
// Reason for not returning entire ring directly is that we need to filter out read-only instances.
if size <= 0 {
size = math.MaxInt
}

if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now())
var result *Ring
if size <= 0 {
result = r.filterOutReadOnlyInstances(0, time.Now())
} else {
result = r.shuffleShard(identifier, size, 0, time.Now())
}
// Only cache subring if it is different from this ring, to avoid deadlocks in getCachedShuffledSubring,
// when we update the cached ring.
if result != r {
Expand All @@ -725,17 +732,20 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
//
// This function supports caching, but the cache will only be effective if successive calls for the
// same identifier are with the same lookbackPeriod and increasing values of now.
//
// Subring returned by this method does not contain read-only instances that have changed their state
// before the lookback period.
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

if cached := r.getCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, lookbackPeriod, now)
var result *Ring
if size <= 0 {
result = r.filterOutReadOnlyInstances(lookbackPeriod, now)
} else {
result = r.shuffleShard(identifier, size, lookbackPeriod, now)
}

if result != r {
r.setCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now, result)
Expand All @@ -756,6 +766,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
//
// If any instance had RegisteredTimestamp equal to 0 (it would not cause additional lookup of next instance),
// then r.oldestRegisteredTimestamp is zero too, and we skip this optimization.
//
// Even if some instances are read-only, they must have changed their read-only status within lookback window
// (because they were all registered within lookback window), so they would be included in the result.
if lookbackPeriod > 0 && r.oldestRegisteredTimestamp > 0 && r.oldestRegisteredTimestamp >= lookbackUntil {
return r
}
Expand All @@ -778,6 +791,19 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
var tokens []uint32

if r.cfg.ZoneAwarenessEnabled {
// If we're going to include all instances from this zone, we can simply filter out
// unwanted instances, and avoid iterating through tokens.
if numInstancesPerZone >= r.instancesCountPerZone[zone] {
for id, inst := range r.ringDesc.Ingesters {
if inst.Zone == zone && shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) {
shard[id] = inst
}
}

// We can go to the next zone, no need to iterate tokens.
continue
}

tokens = r.ringTokensByZone[zone]
} else {
// When zone-awareness is disabled, we just iterate over 1 single fake zone
Expand Down Expand Up @@ -819,11 +845,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
instanceID := info.InstanceID
instance := r.ringDesc.Ingesters[instanceID]

// The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded.
if lookbackPeriod == 0 && instance.ReadOnly {
if !shouldIncludeReadonlyInstanceInTheShard(instance, lookbackPeriod, lookbackUntil) {
continue
}

// Include instance in the subring.
shard[instanceID] = instance

Expand Down Expand Up @@ -855,7 +879,56 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
}
}

// Build a read-only ring for the shard.
return r.buildRingForTheShard(shard)
}

// shouldIncludeReadonlyInstanceInTheShard returns true if instance is not read-only, or when it is read-only and should be included in the shuffle shard.
func shouldIncludeReadonlyInstanceInTheShard(instance InstanceDesc, lookbackPeriod time.Duration, lookbackUntil int64) bool {
if !instance.ReadOnly {
return true
}
// The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded.
if lookbackPeriod == 0 {
return false
}
// With lookback period >0, read only instances are only included if they have not changed read-only status in the lookback window.
// If ReadOnlyUpdatedTimestamp is not set, we include the instance, and extend the shard later.
if lookbackPeriod > 0 && instance.ReadOnlyUpdatedTimestamp > 0 && instance.ReadOnlyUpdatedTimestamp < lookbackUntil {
return false
}
return true
}

// filterOutReadOnlyInstances removes all read-only instances from the ring, and returns the resulting ring.
func (r *Ring) filterOutReadOnlyInstances(lookbackPeriod time.Duration, now time.Time) *Ring {
lookbackUntil := now.Add(-lookbackPeriod).Unix()

r.mtx.RLock()
defer r.mtx.RUnlock()

// If there are no read-only instances, there's no need to do any filtering.
if r.readOnlyInstances != nil && *r.readOnlyInstances == 0 {
return r
}

// If all readOnlyUpdatedTimestamp values are within lookback window, we can return the ring without any filtering.
if lookbackPeriod > 0 && r.oldestReadOnlyUpdatedTimestamp != nil && *r.oldestReadOnlyUpdatedTimestamp >= lookbackUntil {
return r
}

shard := make(map[string]InstanceDesc, len(r.ringDesc.Ingesters))

for id, inst := range r.ringDesc.Ingesters {
if shouldIncludeReadonlyInstanceInTheShard(inst, lookbackPeriod, lookbackUntil) {
shard[id] = inst
}
}

return r.buildRingForTheShard(shard)
}

// buildRingForTheShard builds read-only ring for the shard (this ring won't be updated in the future).
func (r *Ring) buildRingForTheShard(shard map[string]InstanceDesc) *Ring {
shardDesc := &Desc{Ingesters: shard}
shardTokensByZone := shardDesc.getTokensByZone()
shardTokens := mergeTokenGroups(shardTokensByZone)
Expand Down
Loading

0 comments on commit 70f84e5

Please sign in to comment.