Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read only ingester support to lifecycler and ring #553

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@
* [ENHANCEMENT] memberlist: Added `-<prefix>memberlist.broadcast-timeout-for-local-updates-on-shutdown` option to set timeout for sending locally-generated updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539
* [ENHANCEMENT] tracing: add ExtractTraceSpanID function.
* [EHNANCEMENT] crypto/tls: Support reloading client certificates #537 #552
* [ENHANCEMENT] Add read only support for ingesters in the ring and lifecycler. #553
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down
6 changes: 4 additions & 2 deletions ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ Rather, it's the delegate's responsibility to call [BasicLifecycler.ChangeState]
- The lifecycler will then periodically, based on the [ring.BasicLifecyclerConfig.TokensObservePeriod], attempt to verify that its tokens have been added to the ring, after which it will call [ring.BasicLifecyclerDelegate.OnRingInstanceTokens].
- The lifecycler will update they key/value store with heartbeats and state changes based on the [ring.BasicLifecyclerConfig.HeartbeatPeriod], calling [ring.BasicLifecyclerDelegate.OnRingInstanceHeartbeat] each time.
- When the BasicLifecycler is stopped, it will call [ring.BasicLifecyclerDelegate.OnRingInstanceStopping].

BasicLifecycler does not support read only instances for now.
*/
type BasicLifecycler struct {
*services.BasicService
Expand Down Expand Up @@ -316,7 +318,7 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error {
// Always overwrite the instance in the ring (even if already exists) because some properties
// may have changed (stated, tokens, zone, address) and even if they didn't the heartbeat at
// least did.
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt)
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt, false, time.Time{})
return ringDesc, true, nil
})

Expand Down Expand Up @@ -443,7 +445,7 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc,
// a resharding of tenants among instances: to guarantee query correctness we need to update the
// registration timestamp to current time.
registeredAt := time.Now()
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt)
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt, false, time.Time{})
}

prevTimestamp := instanceDesc.Timestamp
Expand Down
15 changes: 8 additions & 7 deletions ring/basic_lifecycler_delegates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestTokensPersistencyDelegate_ShouldHandleTheCaseTheInstanceIsAlreadyInTheR
// Add the instance to the ring.
require.NoError(t, store.CAS(ctx, testRingKey, func(interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
ringDesc.AddIngester(cfg.ID, cfg.Addr, cfg.Zone, testData.initialTokens, testData.initialState, registeredAt)
ringDesc.AddIngester(cfg.ID, cfg.Addr, cfg.Zone, testData.initialTokens, testData.initialState, registeredAt, false, time.Now())
return ringDesc, true, nil
}))

Expand Down Expand Up @@ -234,28 +234,29 @@ func TestDelegatesChain(t *testing.T) {
func TestAutoForgetDelegate(t *testing.T) {
const forgetPeriod = time.Minute
registeredAt := time.Now()
readOnlyUpdated := time.Time{}

tests := map[string]struct {
setup func(ringDesc *Desc)
expectedInstances []string
}{
"no unhealthy instance in the ring": {
setup: func(ringDesc *Desc) {
ringDesc.AddIngester("instance-1", "1.1.1.1", "", nil, ACTIVE, registeredAt)
ringDesc.AddIngester("instance-1", "1.1.1.1", "", nil, ACTIVE, registeredAt, false, readOnlyUpdated)
},
expectedInstances: []string{testInstanceID, "instance-1"},
},
"unhealthy instance in the ring that has NOTreached the forget period yet": {
setup: func(ringDesc *Desc) {
i := ringDesc.AddIngester("instance-1", "1.1.1.1", "", nil, ACTIVE, registeredAt)
i := ringDesc.AddIngester("instance-1", "1.1.1.1", "", nil, ACTIVE, registeredAt, false, readOnlyUpdated)
i.Timestamp = time.Now().Add(-forgetPeriod).Add(5 * time.Second).Unix()
ringDesc.Ingesters["instance-1"] = i
},
expectedInstances: []string{testInstanceID, "instance-1"},
},
"unhealthy instance in the ring that has reached the forget period": {
setup: func(ringDesc *Desc) {
i := ringDesc.AddIngester("instance-1", "1.1.1.1", "", nil, ACTIVE, registeredAt)
i := ringDesc.AddIngester("instance-1", "1.1.1.1", "", nil, ACTIVE, registeredAt, false, readOnlyUpdated)
i.Timestamp = time.Now().Add(-forgetPeriod).Add(-5 * time.Second).Unix()
ringDesc.Ingesters["instance-1"] = i
},
Expand Down Expand Up @@ -320,7 +321,7 @@ func TestInstanceRegisterDelegate_OnRingInstanceRegister(t *testing.T) {
otherIngesterTokens := []uint32{100, 200, 300, 400, 500}

desc := NewDesc()
desc.AddIngester("other-instance", "addr", "zone", otherIngesterTokens, ACTIVE, time.Now())
desc.AddIngester("other-instance", "addr", "zone", otherIngesterTokens, ACTIVE, time.Now(), false, time.Time{})

state, tokens := delegate.OnRingInstanceRegister(lifecycler, *desc, false, "test-instance", InstanceDesc{})
require.Equal(t, JOINING, state)
Expand All @@ -341,10 +342,10 @@ func TestInstanceRegisterDelegate_OnRingInstanceRegister(t *testing.T) {
otherIngesterTokens := []uint32{100, 200, 300, 400, 500}

desc := NewDesc()
desc.AddIngester("other-instance", "addr", "zone", otherIngesterTokens, ACTIVE, time.Now())
desc.AddIngester("other-instance", "addr", "zone", otherIngesterTokens, ACTIVE, time.Now(), false, time.Time{})

prevTokens := []uint32{10, 20, 30}
desc.AddIngester("test-instance", "test-addr", "zone", prevTokens, JOINING, time.Now())
desc.AddIngester("test-instance", "test-addr", "zone", prevTokens, JOINING, time.Now(), false, time.Time{})

state, tokens := delegate.OnRingInstanceRegister(lifecycler, *desc, true, "test-instance", desc.GetIngesters()["test-instance"])
require.Equal(t, ACTIVE, state)
Expand Down
19 changes: 17 additions & 2 deletions ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ func TestBasicLifecycler_RegisterOnStart(t *testing.T) {
registerState: ACTIVE,
registerTokens: Tokens{1, 2, 3, 4, 5},
},
"initial ring contains read only instance": {
initialInstanceID: testInstanceID,
initialInstanceDesc: &InstanceDesc{
Addr: "1.1.1.1",
State: ACTIVE,
Tokens: Tokens{1, 2, 3, 4, 5},
RegisteredTimestamp: time.Now().Add(-time.Hour).Unix(),
ReadOnly: true,
ReadOnlyUpdatedTimestamp: time.Now().Unix(),
},
registerState: ACTIVE,
registerTokens: Tokens{1, 2, 3, 4, 5},
},
}

for testName, testData := range tests {
Expand All @@ -118,7 +131,7 @@ func TestBasicLifecycler_RegisterOnStart(t *testing.T) {
desc := testData.initialInstanceDesc

ringDesc := GetOrCreateRingDesc(in)
ringDesc.AddIngester(testData.initialInstanceID, desc.Addr, desc.Zone, desc.Tokens, desc.State, desc.GetRegisteredAt())
ringDesc.AddIngester(testData.initialInstanceID, desc.Addr, desc.Zone, desc.Tokens, desc.State, desc.GetRegisteredAt(), desc.ReadOnly, time.Unix(desc.ReadOnlyUpdatedTimestamp, 0))
return ringDesc, true, nil
}))
}
Expand All @@ -138,6 +151,8 @@ func TestBasicLifecycler_RegisterOnStart(t *testing.T) {
assert.Equal(t, testData.initialInstanceDesc.State, instanceDesc.State)
assert.Equal(t, testData.initialInstanceDesc.Tokens, instanceDesc.Tokens)
assert.Equal(t, testData.initialInstanceDesc.RegisteredTimestamp, instanceDesc.RegisteredTimestamp)
assert.Equal(t, testData.initialInstanceDesc.ReadOnly, instanceDesc.ReadOnly)
assert.Equal(t, testData.initialInstanceDesc.ReadOnlyUpdatedTimestamp, instanceDesc.ReadOnlyUpdatedTimestamp)
} else {
assert.False(t, instanceExists)
}
Expand Down Expand Up @@ -463,7 +478,7 @@ func TestBasicLifecycler_TokensObservePeriod(t *testing.T) {
// Remove some tokens.
return store.CAS(ctx, testRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := GetOrCreateRingDesc(in)
ringDesc.AddIngester(testInstanceID, desc.Addr, desc.Zone, Tokens{4, 5}, desc.State, time.Now())
ringDesc.AddIngester(testInstanceID, desc.Addr, desc.Zone, Tokens{4, 5}, desc.State, time.Now(), false, time.Time{})
return ringDesc, true, nil
}) == nil
})
Expand Down
2 changes: 1 addition & 1 deletion ring/bench/ring_memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func BenchmarkMemberlistReceiveWithRingDesc(b *testing.B) {
{
for i := 0; i < numInstances; i++ {
tokens := generateUniqueTokens(i, numTokens)
initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", tokens, ring.ACTIVE, time.Now())
initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", tokens, ring.ACTIVE, time.Now(), false, time.Time{})
}
// Send a single update to populate the store.
msg := encodeMessage(b, "ring", initialDesc)
Expand Down
86 changes: 73 additions & 13 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ type Lifecycler struct {

// We need to remember the ingester state, tokens and registered timestamp just in case the KV store
// goes away and comes back empty. The state changes during lifecycle of instance.
stateMtx sync.RWMutex
state InstanceState
tokens Tokens
registeredAt time.Time
stateMtx sync.RWMutex
state InstanceState
tokens Tokens
registeredAt time.Time
readOnly bool
readOnlyLastUpdated time.Time

// Controls the ready-reporting
readyLock sync.Mutex
Expand All @@ -161,6 +163,7 @@ type Lifecycler struct {
countersLock sync.RWMutex
healthyInstancesCount int
instancesCount int
readOnlyInstancesCount int
healthyInstancesInZoneCount int
instancesInZoneCount int
zonesCount int
Expand Down Expand Up @@ -349,6 +352,26 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error
return <-errCh
}

func (i *Lifecycler) ChangeReadOnlyState(ctx context.Context, readOnly bool) error {
errCh := make(chan error)
fn := func() {
prevReadOnly, _ := i.GetReadOnlyState()
if prevReadOnly == readOnly {
errCh <- nil
return
}

level.Info(i.logger).Log("msg", "changing read-only state of instance in the ring", "readOnly", readOnly, "ring", i.RingName)
i.setReadOnlyState(readOnly, time.Now())
errCh <- i.updateConsul(ctx)
}

if err := i.sendToLifecyclerLoop(fn); err != nil {
return err
}
return <-errCh
}

func (i *Lifecycler) getTokens() Tokens {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
Expand Down Expand Up @@ -379,6 +402,21 @@ func (i *Lifecycler) setRegisteredAt(registeredAt time.Time) {
i.registeredAt = registeredAt
}

// GetReadOnlyState returns the read-only state of this instance -- whether instance is read-only, and when what the last
// update of read-only state (possibly zero).
func (i *Lifecycler) GetReadOnlyState() (bool, time.Time) {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.readOnly, i.readOnlyLastUpdated
}

func (i *Lifecycler) setReadOnlyState(readOnly bool, readOnlyLastUpdated time.Time) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
i.readOnly = readOnly
i.readOnlyLastUpdated = readOnlyLastUpdated
}

// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
//
// For this method to work correctly (especially when using gossiping), source ingester (specified by
Expand Down Expand Up @@ -442,6 +480,14 @@ func (i *Lifecycler) InstancesCount() int {
return i.instancesCount
}

// ReadOnlyInstancesCount returns the total number of instances in the ring that are read only, updated during the last heartbeat period.
func (i *Lifecycler) ReadOnlyInstancesCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()

return i.readOnlyInstancesCount
}

// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) HealthyInstancesInZoneCount() int {
Expand Down Expand Up @@ -629,32 +675,38 @@ func (i *Lifecycler) initRing(ctx context.Context) error {

instanceDesc, ok := ringDesc.Ingesters[i.ID]
if !ok {
// The instance doesn't exist in the ring, so it's safe to set the registered timestamp
// as of now.
registeredAt := time.Now()
i.setRegisteredAt(registeredAt)
now := time.Now()
// The instance doesn't exist in the ring, so it's safe to set the registered timestamp as of now.
i.setRegisteredAt(now)
// Clear read-only state, and set last update time to "now".
i.setReadOnlyState(false, now)

// We use the tokens from the file only if it does not exist in the ring yet.
if len(tokensFromFile) > 0 {
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) >= i.cfg.NumTokens {
i.setState(ACTIVE)
}
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), i.getRegisteredAt(), ro, rots)
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}

// Either we are a new ingester, or consul must have restarted
level.Info(i.logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), registeredAt)
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
}

// The instance already exists in the ring, so we can't change the registered timestamp (even if it's zero)
// but we need to update the local state accordingly.
i.setRegisteredAt(instanceDesc.GetRegisteredAt())

// Set lifecycler read-only state from ring entry. We will not modify ring entry's read-only state.
i.setReadOnlyState(instanceDesc.GetReadOnlyState())

// If the ingester is in the JOINING state this means it crashed due to
// a failed token transfer or some other reason during startup. We want
// to set it back to PENDING in order to start the lifecycle from the
Expand Down Expand Up @@ -747,7 +799,8 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt())
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt(), ro, rots)

i.setTokens(ringTokens)

Expand Down Expand Up @@ -855,7 +908,8 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
sort.Sort(myTokens)
i.setTokens(myTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
})

Expand Down Expand Up @@ -889,7 +943,8 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
tokens = instanceDesc.Tokens
}

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt())
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
})

Expand Down Expand Up @@ -922,6 +977,7 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error
func (i *Lifecycler) updateCounters(ringDesc *Desc) {
healthyInstancesCount := 0
instancesCount := 0
readOnlyInstancesCount := 0
zones := map[string]int{}
healthyInstancesInZone := map[string]int{}

Expand All @@ -931,6 +987,9 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
for _, ingester := range ringDesc.Ingesters {
zones[ingester.Zone]++
instancesCount++
if ingester.ReadOnly {
readOnlyInstancesCount++
}

// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) {
Expand All @@ -944,6 +1003,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
i.countersLock.Lock()
i.healthyInstancesCount = healthyInstancesCount
i.instancesCount = instancesCount
i.readOnlyInstancesCount = readOnlyInstancesCount
i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone]
i.instancesInZoneCount = zones[i.cfg.Zone]
i.zonesCount = len(zones)
Expand Down
Loading