Skip to content

Commit

Permalink
fix logic in ring lifecycler.go initRing and model.go mergeWithTime f…
Browse files Browse the repository at this point in the history
…or pointer usage, fix compilation errors in lifecycler_test.go and merge_test.go; pass changed basic_lifecycler_test.go to use pointers; opt for all pointer access; pass all mentioned tests with race check
  • Loading branch information
francoposa committed Jan 14, 2025
1 parent 9d0f2a2 commit a05aa99
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 58 deletions.
8 changes: 4 additions & 4 deletions ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,16 +588,16 @@ func (m *mockDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ring
}
}

func getInstanceFromStore(t *testing.T, store kv.Client, instanceID string) (InstanceDesc, bool) {
func getInstanceFromStore(t *testing.T, store kv.Client, instanceID string) (*InstanceDesc, bool) {
out, err := store.Get(context.Background(), testRingKey)
require.NoError(t, err)

if out == nil {
return InstanceDesc{}, false
return nil, false
}

ringDesc := out.(*Desc)
instanceDesc, ok := ringDesc.GetIngesterVal(instanceID)
instanceDesc := ringDesc.GetIngester(instanceID)

return instanceDesc, ok
return instanceDesc, instanceDesc != nil
}
7 changes: 6 additions & 1 deletion ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return ringDesc, true, nil
}

// instanceDesc is a pointer to the value in the ringDesc.Ingesters map,
// so updates to it will be reflected in ringDesc.Ingesters[i.ID];
// hold a copy of the original state so we can determine if we need to update the ring's backing store
originalInstanceDesc := *instanceDesc

// The instance already exists in the ring, so we can't change the registered timestamp (even if it's zero)
// but we need to update the local state accordingly.
i.setRegisteredAt(instanceDesc.GetRegisteredAt())
Expand Down Expand Up @@ -765,7 +770,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {

// Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat
// can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady.
if !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) {
if !instanceDesc.Equal(originalInstanceDesc) {
// Update timestamp to give gossiping client a chance register ring change.
instanceDesc.Timestamp = time.Now().Unix()
ringDesc.Ingesters[i.ID] = instanceDesc
Expand Down
33 changes: 19 additions & 14 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) {
})

// Verify ingester joined, is active, and has 128 tokens
var ingDesc InstanceDesc
var ingDesc *InstanceDesc
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)
Expand Down Expand Up @@ -848,7 +848,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) {
})

// Verify ingester joined, is active, and has 64 tokens
var ingDesc InstanceDesc
var ingDesc *InstanceDesc
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)
Expand Down Expand Up @@ -1008,7 +1008,7 @@ func TestCheckReady_MinReadyDuration(t *testing.T) {
startTime := time.Now()

// Wait until the instance is InstanceState_ACTIVE and healthy in the ring.
waitRingInstance(t, 3*time.Second, l, func(instance InstanceDesc) error {
waitRingInstance(t, 3*time.Second, l, func(instance *InstanceDesc) error {
return instance.IsReady(time.Now(), cfg.RingConfig.HeartbeatTimeout)
})

Expand Down Expand Up @@ -1096,8 +1096,8 @@ func TestCheckReady_CheckRingHealth(t *testing.T) {

// Wait until both instances are registered in the ring. We expect them to be registered
// immediately and then switch to InstanceState_ACTIVE after the configured auto join delay.
waitRingInstance(t, 3*time.Second, l1, func(InstanceDesc) error { return nil })
waitRingInstance(t, 3*time.Second, l2, func(InstanceDesc) error { return nil })
waitRingInstance(t, 3*time.Second, l1, func(*InstanceDesc) error { return nil })
waitRingInstance(t, 3*time.Second, l2, func(*InstanceDesc) error { return nil })

// Poll the readiness check until ready and measure how much time it takes.
test.Poll(t, 5*time.Second, nil, func() interface{} {
Expand Down Expand Up @@ -1130,8 +1130,8 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

// poll function waits for a condition and returning actual state of the ingesters after the condition succeed.
poll := func(condition func(*Desc) bool) map[string]InstanceDesc {
var ingesters map[string]InstanceDesc
poll := func(condition func(*Desc) bool) map[string]*InstanceDesc {
var ingesters map[string]*InstanceDesc
test.Poll(t, 5*time.Second, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
Expand All @@ -1157,7 +1157,10 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler))
poll(func(desc *Desc) bool {
return desc.Ingesters[ingId].State == InstanceState_ACTIVE
if ing := desc.GetIngester(ingId); ing != nil {
return ing.State == InstanceState_ACTIVE
}
return false
})
return lifecycler
}
Expand Down Expand Up @@ -1332,7 +1335,9 @@ func TestTokensOnDisk(t *testing.T) {
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
actTokens = desc.Ingesters["ing2"].Tokens
if ing := desc.GetIngester("ing2"); ing != nil {
actTokens = ing.Tokens
}
}
return ok &&
len(desc.Ingesters) == 1 &&
Expand Down Expand Up @@ -1416,7 +1421,7 @@ func TestJoinInLeavingState(t *testing.T) {
// Set state as InstanceState_LEAVING
err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) {
r := &Desc{
Ingesters: map[string]InstanceDesc{
Ingesters: map[string]*InstanceDesc{
"ing1": {
State: InstanceState_LEAVING,
Tokens: []uint32{1, 4},
Expand Down Expand Up @@ -1473,7 +1478,7 @@ func TestJoinInJoiningState(t *testing.T) {
// Set state as InstanceState_JOINING
err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) {
r := &Desc{
Ingesters: map[string]InstanceDesc{
Ingesters: map[string]*InstanceDesc{
"ing1": {
State: InstanceState_JOINING,
Tokens: []uint32{1, 4},
Expand Down Expand Up @@ -1528,7 +1533,7 @@ func TestWaitBeforeJoining(t *testing.T) {

err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) {
r := &Desc{
Ingesters: map[string]InstanceDesc{
Ingesters: map[string]*InstanceDesc{
instanceName(0, 1): {
State: InstanceState_ACTIVE,
Tokens: []uint32{1, 2, 3},
Expand Down Expand Up @@ -1738,7 +1743,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) {
// Set ing1 to not have a zone
err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) {
r := &Desc{
Ingesters: map[string]InstanceDesc{
Ingesters: map[string]*InstanceDesc{
"ing1": {
State: InstanceState_ACTIVE,
Addr: "0.0.0.0",
Expand Down Expand Up @@ -1772,7 +1777,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) {
})
}

func waitRingInstance(t *testing.T, timeout time.Duration, l *Lifecycler, check func(instance InstanceDesc) error) {
func waitRingInstance(t *testing.T, timeout time.Duration, l *Lifecycler, check func(instance *InstanceDesc) error) {
test.Poll(t, timeout, nil, func() interface{} {
desc, err := l.KVStore.Get(context.Background(), l.RingKey)
if err != nil {
Expand Down
Loading

0 comments on commit a05aa99

Please sign in to comment.