diff --git a/CHANGELOG.md b/CHANGELOG.md index e144f3552..f979ade32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -220,6 +220,7 @@ * [ENHANCEMENT] tracing: add ExtractTraceSpanID function. * [EHNANCEMENT] crypto/tls: Support reloading client certificates #537 #552 * [ENHANCEMENT] Add read only support for ingesters in the ring and lifecycler. #553 +* [ENHANCEMENT] Added new ring methods to expose writable instances per zone counts. #560 * [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 diff --git a/ring/model.go b/ring/model.go index 08f1d7690..bf86b2965 100644 --- a/ring/model.go +++ b/ring/model.go @@ -570,6 +570,30 @@ func (d *Desc) instancesWithTokensCountPerZone() map[string]int { return instancesCountPerZone } +func (d *Desc) writableInstancesCountPerZone() map[string]int { + instancesCountPerZone := map[string]int{} + if d != nil { + for _, ingester := range d.Ingesters { + if !ingester.ReadOnly { + instancesCountPerZone[ingester.Zone]++ + } + } + } + return instancesCountPerZone +} + +func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int { + instancesCountPerZone := map[string]int{} + if d != nil { + for _, ingester := range d.Ingesters { + if len(ingester.Tokens) > 0 && !ingester.ReadOnly { + instancesCountPerZone[ingester.Zone]++ + } + } + } + return instancesCountPerZone +} + type CompareResult int // CompareResult responses diff --git a/ring/ring.go b/ring/ring.go index 60e163aeb..9ab80f853 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -87,6 +87,12 @@ type ReadRing interface { // InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens. InstancesWithTokensInZoneCount(zone string) int + // WritableInstancesInZoneCount returns the number of writable instances in the ring that are registered in given zone. + WritableInstancesInZoneCount(zone string) int + + // WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. + WritableInstancesWithTokensInZoneCount(zone string) int + // ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring. ZonesCount() int } @@ -204,6 +210,12 @@ type Ring struct { // Nubmber of registered instances with tokens per zone. instancesWithTokensCountPerZone map[string]int + // Number of registered instances per zone that are writable. + writableInstancesCountPerZone map[string]int + + // Nubmber of registered instances with tokens per zone that are writable. + writableInstancesWithTokensCountPerZone map[string]int + // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes. // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring @@ -356,6 +368,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) { instancesWithTokensCount := ringDesc.instancesWithTokensCount() instancesCountPerZone := ringDesc.instancesCountPerZone() instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone() + writableInstancesCountPerZone := ringDesc.writableInstancesCountPerZone() + writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone() r.mtx.Lock() defer r.mtx.Unlock() @@ -367,6 +381,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.instancesWithTokensCount = instancesWithTokensCount r.instancesCountPerZone = instancesCountPerZone r.instancesWithTokensCountPerZone = instancesWithTokensCountPerZone + r.writableInstancesCountPerZone = writableInstancesCountPerZone + r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone r.oldestRegisteredTimestamp = oldestRegisteredTimestamp r.lastTopologyChange = now @@ -823,15 +839,17 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur shardTokens := mergeTokenGroups(shardTokensByZone) return &Ring{ - cfg: r.cfg, - strategy: r.strategy, - ringDesc: shardDesc, - ringTokens: shardTokens, - ringTokensByZone: shardTokensByZone, - ringZones: getZones(shardTokensByZone), - instancesWithTokensCount: shardDesc.instancesWithTokensCount(), - instancesCountPerZone: shardDesc.instancesCountPerZone(), - instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(), + cfg: r.cfg, + strategy: r.strategy, + ringDesc: shardDesc, + ringTokens: shardTokens, + ringTokensByZone: shardTokensByZone, + ringZones: getZones(shardTokensByZone), + instancesWithTokensCount: shardDesc.instancesWithTokensCount(), + instancesCountPerZone: shardDesc.instancesCountPerZone(), + instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(), + writableInstancesCountPerZone: shardDesc.writableInstancesCountPerZone(), + writableInstancesWithTokensCountPerZone: shardDesc.writableInstancesWithTokensCountPerZone(), oldestRegisteredTimestamp: shardDesc.getOldestRegisteredTimestamp(), @@ -1140,6 +1158,22 @@ func (r *Ring) InstancesWithTokensInZoneCount(zone string) int { return r.instancesWithTokensCountPerZone[zone] } +// WritableInstancesInZoneCount returns the number of writable instances in the ring that are registered in given zone. +func (r *Ring) WritableInstancesInZoneCount(zone string) int { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.writableInstancesCountPerZone[zone] +} + +// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens. +func (r *Ring) WritableInstancesWithTokensInZoneCount(zone string) int { + r.mtx.RLock() + defer r.mtx.RUnlock() + + return r.writableInstancesWithTokensCountPerZone[zone] +} + func (r *Ring) ZonesCount() int { r.mtx.RLock() defer r.mtx.RUnlock() diff --git a/ring/ring_test.go b/ring/ring_test.go index ee4ab104f..440f24d5f 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -1254,6 +1254,88 @@ func TestRing_GetInstancesWithTokensCounts(t *testing.T) { } } +func TestRing_GetWritableInstancesWithTokensCounts(t *testing.T) { + gen := initTokenGenerator(t) + + tests := map[string]struct { + ringInstances map[string]InstanceDesc + expectedWritableInstancesCountPerZone map[string]int + expectedWritableInstancesWithTokensCountPerZone map[string]int + }{ + "empty ring": { + ringInstances: nil, + expectedWritableInstancesCountPerZone: map[string]int{}, + expectedWritableInstancesWithTokensCountPerZone: map[string]int{}, + }, + "single zone, no tokens": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{}}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: LEAVING, Tokens: []uint32{}, ReadOnly: true}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: PENDING, Tokens: []uint32{}}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: JOINING, Tokens: []uint32{}}, + }, + expectedWritableInstancesCountPerZone: map[string]int{"zone-a": 3}, + expectedWritableInstancesWithTokensCountPerZone: map[string]int{"zone-a": 0}, + }, + "single zone, some tokens": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: gen.GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{}}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: gen.GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: LEAVING, Tokens: []uint32{}}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-a", State: PENDING, Tokens: gen.GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-a", State: PENDING, Tokens: []uint32{}, ReadOnly: true}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: JOINING, Tokens: gen.GenerateTokens(128, nil), ReadOnly: true}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-a", State: JOINING, Tokens: []uint32{}, ReadOnly: true}, + }, + expectedWritableInstancesCountPerZone: map[string]int{"zone-a": 5}, + expectedWritableInstancesWithTokensCountPerZone: map[string]int{"zone-a": 3}, + }, + "multiple zones": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: gen.GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{}, ReadOnly: true}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: LEAVING, Tokens: gen.GenerateTokens(128, nil), ReadOnly: true}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: LEAVING, Tokens: []uint32{}}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: PENDING, Tokens: gen.GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-d", State: PENDING, Tokens: []uint32{}}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-c", State: JOINING, Tokens: gen.GenerateTokens(128, nil)}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-d", State: JOINING, Tokens: []uint32{}, ReadOnly: true}, + }, + expectedWritableInstancesCountPerZone: map[string]int{"zone-a": 1, "zone-b": 1, "zone-c": 2, "zone-d": 1}, + expectedWritableInstancesWithTokensCountPerZone: map[string]int{"zone-a": 1, "zone-b": 0, "zone-c": 2, "zone-d": 0}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Init the ring. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + instance.Timestamp = time.Now().Unix() + ringDesc.Ingesters[id] = instance + } + + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, + ringDesc: ringDesc, + writableInstancesCountPerZone: ringDesc.writableInstancesCountPerZone(), + writableInstancesWithTokensCountPerZone: ringDesc.writableInstancesWithTokensCountPerZone(), + } + + for z, instances := range testData.expectedWritableInstancesCountPerZone { + assert.Equal(t, instances, ring.WritableInstancesInZoneCount(z)) + } + for z, instances := range testData.expectedWritableInstancesWithTokensCountPerZone { + assert.Equal(t, instances, ring.WritableInstancesWithTokensInZoneCount(z)) + } + }) + } +} + func TestRing_ShuffleShard(t *testing.T) { gen := initTokenGenerator(t) diff --git a/ring/util_test.go b/ring/util_test.go index 3403adb4d..91dfc9b41 100644 --- a/ring/util_test.go +++ b/ring/util_test.go @@ -83,6 +83,14 @@ func (r *RingMock) InstancesWithTokensInZoneCount(_ string) int { return 0 } +func (r *RingMock) WritableInstancesInZoneCount(_ string) int { + return 0 +} + +func (r *RingMock) WritableInstancesWithTokensInZoneCount(_ string) int { + return 0 +} + func (r *RingMock) ZonesCount() int { return 0 }