Skip to content

Commit

Permalink
Add support to ring for tracking writable instances per zone counts. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman authored Aug 14, 2024
1 parent a84ba7a commit e450497
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
* [ENHANCEMENT] tracing: add ExtractTraceSpanID function.
* [EHNANCEMENT] crypto/tls: Support reloading client certificates #537 #552
* [ENHANCEMENT] Add read only support for ingesters in the ring and lifecycler. #553
* [ENHANCEMENT] Added new ring methods to expose writable instances per zone counts. #560
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down
24 changes: 24 additions & 0 deletions ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,30 @@ func (d *Desc) instancesWithTokensCountPerZone() map[string]int {
return instancesCountPerZone
}

func (d *Desc) writableInstancesCountPerZone() map[string]int {
instancesCountPerZone := map[string]int{}
if d != nil {
for _, ingester := range d.Ingesters {
if !ingester.ReadOnly {
instancesCountPerZone[ingester.Zone]++
}
}
}
return instancesCountPerZone
}

func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int {
instancesCountPerZone := map[string]int{}
if d != nil {
for _, ingester := range d.Ingesters {
if len(ingester.Tokens) > 0 && !ingester.ReadOnly {
instancesCountPerZone[ingester.Zone]++
}
}
}
return instancesCountPerZone
}

type CompareResult int

// CompareResult responses
Expand Down
52 changes: 43 additions & 9 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ type ReadRing interface {
// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens.
InstancesWithTokensInZoneCount(zone string) int

// WritableInstancesInZoneCount returns the number of writable instances in the ring that are registered in given zone.
WritableInstancesInZoneCount(zone string) int

// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
WritableInstancesWithTokensInZoneCount(zone string) int

// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
ZonesCount() int
}
Expand Down Expand Up @@ -204,6 +210,12 @@ type Ring struct {
// Nubmber of registered instances with tokens per zone.
instancesWithTokensCountPerZone map[string]int

// Number of registered instances per zone that are writable.
writableInstancesCountPerZone map[string]int

// Nubmber of registered instances with tokens per zone that are writable.
writableInstancesWithTokensCountPerZone map[string]int

// Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring
Expand Down Expand Up @@ -356,6 +368,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
instancesWithTokensCount := ringDesc.instancesWithTokensCount()
instancesCountPerZone := ringDesc.instancesCountPerZone()
instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone()
writableInstancesCountPerZone := ringDesc.writableInstancesCountPerZone()
writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone()

r.mtx.Lock()
defer r.mtx.Unlock()
Expand All @@ -367,6 +381,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.instancesWithTokensCount = instancesWithTokensCount
r.instancesCountPerZone = instancesCountPerZone
r.instancesWithTokensCountPerZone = instancesWithTokensCountPerZone
r.writableInstancesCountPerZone = writableInstancesCountPerZone
r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone
r.oldestRegisteredTimestamp = oldestRegisteredTimestamp
r.lastTopologyChange = now

Expand Down Expand Up @@ -823,15 +839,17 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
shardTokens := mergeTokenGroups(shardTokensByZone)

return &Ring{
cfg: r.cfg,
strategy: r.strategy,
ringDesc: shardDesc,
ringTokens: shardTokens,
ringTokensByZone: shardTokensByZone,
ringZones: getZones(shardTokensByZone),
instancesWithTokensCount: shardDesc.instancesWithTokensCount(),
instancesCountPerZone: shardDesc.instancesCountPerZone(),
instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(),
cfg: r.cfg,
strategy: r.strategy,
ringDesc: shardDesc,
ringTokens: shardTokens,
ringTokensByZone: shardTokensByZone,
ringZones: getZones(shardTokensByZone),
instancesWithTokensCount: shardDesc.instancesWithTokensCount(),
instancesCountPerZone: shardDesc.instancesCountPerZone(),
instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(),
writableInstancesCountPerZone: shardDesc.writableInstancesCountPerZone(),
writableInstancesWithTokensCountPerZone: shardDesc.writableInstancesWithTokensCountPerZone(),

oldestRegisteredTimestamp: shardDesc.getOldestRegisteredTimestamp(),

Expand Down Expand Up @@ -1140,6 +1158,22 @@ func (r *Ring) InstancesWithTokensInZoneCount(zone string) int {
return r.instancesWithTokensCountPerZone[zone]
}

// WritableInstancesInZoneCount returns the number of writable instances in the ring that are registered in given zone.
func (r *Ring) WritableInstancesInZoneCount(zone string) int {
r.mtx.RLock()
defer r.mtx.RUnlock()

return r.writableInstancesCountPerZone[zone]
}

// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
func (r *Ring) WritableInstancesWithTokensInZoneCount(zone string) int {
r.mtx.RLock()
defer r.mtx.RUnlock()

return r.writableInstancesWithTokensCountPerZone[zone]
}

func (r *Ring) ZonesCount() int {
r.mtx.RLock()
defer r.mtx.RUnlock()
Expand Down
82 changes: 82 additions & 0 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,88 @@ func TestRing_GetInstancesWithTokensCounts(t *testing.T) {
}
}

func TestRing_GetWritableInstancesWithTokensCounts(t *testing.T) {
gen := initTokenGenerator(t)

tests := map[string]struct {
ringInstances map[string]InstanceDesc
expectedWritableInstancesCountPerZone map[string]int
expectedWritableInstancesWithTokensCountPerZone map[string]int
}{
"empty ring": {
ringInstances: nil,
expectedWritableInstancesCountPerZone: map[string]int{},
expectedWritableInstancesWithTokensCountPerZone: map[string]int{},
},
"single zone, no tokens": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{}},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: LEAVING, Tokens: []uint32{}, ReadOnly: true},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: PENDING, Tokens: []uint32{}},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: JOINING, Tokens: []uint32{}},
},
expectedWritableInstancesCountPerZone: map[string]int{"zone-a": 3},
expectedWritableInstancesWithTokensCountPerZone: map[string]int{"zone-a": 0},
},
"single zone, some tokens": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: gen.GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{}},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: gen.GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: LEAVING, Tokens: []uint32{}},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-a", State: PENDING, Tokens: gen.GenerateTokens(128, nil)},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-a", State: PENDING, Tokens: []uint32{}, ReadOnly: true},
"instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: JOINING, Tokens: gen.GenerateTokens(128, nil), ReadOnly: true},
"instance-8": {Addr: "127.0.0.8", Zone: "zone-a", State: JOINING, Tokens: []uint32{}, ReadOnly: true},
},
expectedWritableInstancesCountPerZone: map[string]int{"zone-a": 5},
expectedWritableInstancesWithTokensCountPerZone: map[string]int{"zone-a": 3},
},
"multiple zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: gen.GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{}, ReadOnly: true},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: LEAVING, Tokens: gen.GenerateTokens(128, nil), ReadOnly: true},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: LEAVING, Tokens: []uint32{}},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: PENDING, Tokens: gen.GenerateTokens(128, nil)},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-d", State: PENDING, Tokens: []uint32{}},
"instance-7": {Addr: "127.0.0.7", Zone: "zone-c", State: JOINING, Tokens: gen.GenerateTokens(128, nil)},
"instance-8": {Addr: "127.0.0.8", Zone: "zone-d", State: JOINING, Tokens: []uint32{}, ReadOnly: true},
},
expectedWritableInstancesCountPerZone: map[string]int{"zone-a": 1, "zone-b": 1, "zone-c": 2, "zone-d": 1},
expectedWritableInstancesWithTokensCountPerZone: map[string]int{"zone-a": 1, "zone-b": 0, "zone-c": 2, "zone-d": 0},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Init the ring.
ringDesc := &Desc{Ingesters: testData.ringInstances}
for id, instance := range ringDesc.Ingesters {
instance.Timestamp = time.Now().Unix()
ringDesc.Ingesters[id] = instance
}

ring := Ring{
cfg: Config{
HeartbeatTimeout: time.Hour,
ZoneAwarenessEnabled: true,
},
ringDesc: ringDesc,
writableInstancesCountPerZone: ringDesc.writableInstancesCountPerZone(),
writableInstancesWithTokensCountPerZone: ringDesc.writableInstancesWithTokensCountPerZone(),
}

for z, instances := range testData.expectedWritableInstancesCountPerZone {
assert.Equal(t, instances, ring.WritableInstancesInZoneCount(z))
}
for z, instances := range testData.expectedWritableInstancesWithTokensCountPerZone {
assert.Equal(t, instances, ring.WritableInstancesWithTokensInZoneCount(z))
}
})
}
}

func TestRing_ShuffleShard(t *testing.T) {
gen := initTokenGenerator(t)

Expand Down
8 changes: 8 additions & 0 deletions ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ func (r *RingMock) InstancesWithTokensInZoneCount(_ string) int {
return 0
}

func (r *RingMock) WritableInstancesInZoneCount(_ string) int {
return 0
}

func (r *RingMock) WritableInstancesWithTokensInZoneCount(_ string) int {
return 0
}

func (r *RingMock) ZonesCount() int {
return 0
}
Expand Down

0 comments on commit e450497

Please sign in to comment.