From a05aa999983863caa79b4f3522040d13cbd392d2 Mon Sep 17 00:00:00 2001 From: francoposa Date: Mon, 13 Jan 2025 15:38:33 -0800 Subject: [PATCH] fix logic in ring lifecycler.go initRing and model.go mergeWithTime for pointer usage, fix compilation errors in lifecycler_test.go and merge_test.go; pass changed basic_lifecycler_test.go to use pointers; opt for all pointer access; pass all mentioned tests with race check --- ring/basic_lifecycler_test.go | 8 ++-- ring/lifecycler.go | 7 +++- ring/lifecycler_test.go | 33 ++++++++------- ring/merge_test.go | 78 +++++++++++++++++------------------ ring/model.go | 4 ++ 5 files changed, 72 insertions(+), 58 deletions(-) diff --git a/ring/basic_lifecycler_test.go b/ring/basic_lifecycler_test.go index 10480bbf9..369704640 100644 --- a/ring/basic_lifecycler_test.go +++ b/ring/basic_lifecycler_test.go @@ -588,16 +588,16 @@ func (m *mockDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ring } } -func getInstanceFromStore(t *testing.T, store kv.Client, instanceID string) (InstanceDesc, bool) { +func getInstanceFromStore(t *testing.T, store kv.Client, instanceID string) (*InstanceDesc, bool) { out, err := store.Get(context.Background(), testRingKey) require.NoError(t, err) if out == nil { - return InstanceDesc{}, false + return nil, false } ringDesc := out.(*Desc) - instanceDesc, ok := ringDesc.GetIngesterVal(instanceID) + instanceDesc := ringDesc.GetIngester(instanceID) - return instanceDesc, ok + return instanceDesc, instanceDesc != nil } diff --git a/ring/lifecycler.go b/ring/lifecycler.go index b4be2a1c5..521d483fe 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -707,6 +707,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error { return ringDesc, true, nil } + // instanceDesc is a pointer to the value in the ringDesc.Ingesters map, + // so updates to it will be reflected in ringDesc.Ingesters[i.ID]; + // hold a copy of the original state so we can determine if we need to update the ring's backing store + originalInstanceDesc := *instanceDesc + // The instance already exists in the ring, so we can't change the registered timestamp (even if it's zero) // but we need to update the local state accordingly. i.setRegisteredAt(instanceDesc.GetRegisteredAt()) @@ -765,7 +770,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat // can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady. - if !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) { + if !instanceDesc.Equal(originalInstanceDesc) { // Update timestamp to give gossiping client a chance register ring change. instanceDesc.Timestamp = time.Now().Unix() ringDesc.Ingesters[i.ID] = instanceDesc diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index a1a147f2c..9e1e286cd 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -670,7 +670,7 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) { }) // Verify ingester joined, is active, and has 128 tokens - var ingDesc InstanceDesc + var ingDesc *InstanceDesc test.Poll(t, time.Second, true, func() interface{} { d, err := r.KVClient.Get(ctx, ringKey) require.NoError(t, err) @@ -848,7 +848,7 @@ func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) { }) // Verify ingester joined, is active, and has 64 tokens - var ingDesc InstanceDesc + var ingDesc *InstanceDesc test.Poll(t, time.Second, true, func() interface{} { d, err := r.KVClient.Get(ctx, ringKey) require.NoError(t, err) @@ -1008,7 +1008,7 @@ func TestCheckReady_MinReadyDuration(t *testing.T) { startTime := time.Now() // Wait until the instance is InstanceState_ACTIVE and healthy in the ring. - waitRingInstance(t, 3*time.Second, l, func(instance InstanceDesc) error { + waitRingInstance(t, 3*time.Second, l, func(instance *InstanceDesc) error { return instance.IsReady(time.Now(), cfg.RingConfig.HeartbeatTimeout) }) @@ -1096,8 +1096,8 @@ func TestCheckReady_CheckRingHealth(t *testing.T) { // Wait until both instances are registered in the ring. We expect them to be registered // immediately and then switch to InstanceState_ACTIVE after the configured auto join delay. - waitRingInstance(t, 3*time.Second, l1, func(InstanceDesc) error { return nil }) - waitRingInstance(t, 3*time.Second, l2, func(InstanceDesc) error { return nil }) + waitRingInstance(t, 3*time.Second, l1, func(*InstanceDesc) error { return nil }) + waitRingInstance(t, 3*time.Second, l2, func(*InstanceDesc) error { return nil }) // Poll the readiness check until ready and measure how much time it takes. test.Poll(t, 5*time.Second, nil, func() interface{} { @@ -1130,8 +1130,8 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck // poll function waits for a condition and returning actual state of the ingesters after the condition succeed. - poll := func(condition func(*Desc) bool) map[string]InstanceDesc { - var ingesters map[string]InstanceDesc + poll := func(condition func(*Desc) bool) map[string]*InstanceDesc { + var ingesters map[string]*InstanceDesc test.Poll(t, 5*time.Second, true, func() interface{} { d, err := r.KVClient.Get(context.Background(), ringKey) require.NoError(t, err) @@ -1157,7 +1157,10 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler)) poll(func(desc *Desc) bool { - return desc.Ingesters[ingId].State == InstanceState_ACTIVE + if ing := desc.GetIngester(ingId); ing != nil { + return ing.State == InstanceState_ACTIVE + } + return false }) return lifecycler } @@ -1332,7 +1335,9 @@ func TestTokensOnDisk(t *testing.T) { require.NoError(t, err) desc, ok := d.(*Desc) if ok { - actTokens = desc.Ingesters["ing2"].Tokens + if ing := desc.GetIngester("ing2"); ing != nil { + actTokens = ing.Tokens + } } return ok && len(desc.Ingesters) == 1 && @@ -1416,7 +1421,7 @@ func TestJoinInLeavingState(t *testing.T) { // Set state as InstanceState_LEAVING err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) { r := &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "ing1": { State: InstanceState_LEAVING, Tokens: []uint32{1, 4}, @@ -1473,7 +1478,7 @@ func TestJoinInJoiningState(t *testing.T) { // Set state as InstanceState_JOINING err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) { r := &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "ing1": { State: InstanceState_JOINING, Tokens: []uint32{1, 4}, @@ -1528,7 +1533,7 @@ func TestWaitBeforeJoining(t *testing.T) { err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) { r := &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ instanceName(0, 1): { State: InstanceState_ACTIVE, Tokens: []uint32{1, 2, 3}, @@ -1738,7 +1743,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { // Set ing1 to not have a zone err = r.KVClient.CAS(context.Background(), ringKey, func(interface{}) (interface{}, bool, error) { r := &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "ing1": { State: InstanceState_ACTIVE, Addr: "0.0.0.0", @@ -1772,7 +1777,7 @@ func TestRestoreOfZoneWhenOverwritten(t *testing.T) { }) } -func waitRingInstance(t *testing.T, timeout time.Duration, l *Lifecycler, check func(instance InstanceDesc) error) { +func waitRingInstance(t *testing.T, timeout time.Duration, l *Lifecycler, check func(instance *InstanceDesc) error) { test.Poll(t, timeout, nil, func() interface{} { desc, err := l.KVStore.Get(context.Background(), l.RingKey) if err != nil { diff --git a/ring/merge_test.go b/ring/merge_test.go index 60f7c5c04..7bc589ab5 100644 --- a/ring/merge_test.go +++ b/ring/merge_test.go @@ -11,7 +11,7 @@ func TestNormalizationAndConflictResolution(t *testing.T) { now := time.Now().Unix() first := &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{50, 40, 40, 30}}, "Ing 2": {Addr: "addr2", Timestamp: 123456, State: InstanceState_LEAVING, Tokens: []uint32{100, 5, 5, 100, 100, 200, 20, 10}}, "Ing 3": {Addr: "addr3", Timestamp: now, State: InstanceState_LEFT, Tokens: []uint32{100, 200, 300}}, @@ -21,7 +21,7 @@ func TestNormalizationAndConflictResolution(t *testing.T) { } second := &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Unknown": { Timestamp: now + 10, Tokens: []uint32{1000, 2000}, @@ -39,7 +39,7 @@ func TestNormalizationAndConflictResolution(t *testing.T) { } assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: 123456, State: InstanceState_LEAVING, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now, State: InstanceState_LEFT}, @@ -50,7 +50,7 @@ func TestNormalizationAndConflictResolution(t *testing.T) { assert.Equal(t, &Desc{ // change ring is always normalized, "Unknown" ingester has lost two tokens: 100 from first ring (because of second ring), and 1000 (conflict resolution) - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Unknown": {Timestamp: now + 10, Tokens: []uint32{1000, 2000}}, }, }, changeRing) @@ -75,7 +75,7 @@ func TestMerge(t *testing.T) { firstRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_JOINING, Tokens: []uint32{5, 10, 20, 100, 200}}, }, @@ -84,7 +84,7 @@ func TestMerge(t *testing.T) { secondRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 3": {Addr: "addr3", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{150, 250, 350}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, }, @@ -93,7 +93,7 @@ func TestMerge(t *testing.T) { thirdRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_LEAVING, Tokens: []uint32{30, 40, 50}}, "Ing 3": {Addr: "addr3", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{150, 250, 350}}, }, @@ -102,7 +102,7 @@ func TestMerge(t *testing.T) { expectedFirstSecondMerge := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{150, 250, 350}}, @@ -112,7 +112,7 @@ func TestMerge(t *testing.T) { expectedFirstSecondThirdMerge := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_LEAVING, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{150, 250, 350}}, @@ -122,7 +122,7 @@ func TestMerge(t *testing.T) { fourthRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_LEFT, Tokens: []uint32{30, 40, 50}}, }, } @@ -130,7 +130,7 @@ func TestMerge(t *testing.T) { expectedFirstSecondThirdFourthMerge := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_LEFT, Tokens: nil}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{150, 250, 350}}, @@ -155,7 +155,7 @@ func TestMerge(t *testing.T) { assert.Equal(t, expectedFirstSecondMerge(), our) // when merging first into second ring, only "Ing 1" is new assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, }, }, ch) @@ -176,7 +176,7 @@ func TestMerge(t *testing.T) { assert.Equal(t, expectedFirstSecondThirdFourthMerge(), out) // entire fourth ring is the update -- but without tokens assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_LEFT, Tokens: nil}, }, }, ch) @@ -188,7 +188,7 @@ func TestTokensTakeover(t *testing.T) { first := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_JOINING, Tokens: []uint32{5, 10, 20}}, // partially migrated from Ing 3 }, @@ -197,7 +197,7 @@ func TestTokensTakeover(t *testing.T) { second := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20}}, "Ing 3": {Addr: "addr3", Timestamp: now + 5, State: InstanceState_LEAVING, Tokens: []uint32{5, 10, 20, 100, 200}}, }, @@ -206,7 +206,7 @@ func TestTokensTakeover(t *testing.T) { merged := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20}}, "Ing 3": {Addr: "addr3", Timestamp: now + 5, State: InstanceState_LEAVING, Tokens: []uint32{100, 200}}, @@ -218,7 +218,7 @@ func TestTokensTakeover(t *testing.T) { our, ch := merge(first(), second()) assert.Equal(t, merged(), our) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20}}, "Ing 3": {Addr: "addr3", Timestamp: now + 5, State: InstanceState_LEAVING, Tokens: []uint32{100, 200}}, // change doesn't contain conflicted tokens }, @@ -237,7 +237,7 @@ func TestTokensTakeover(t *testing.T) { // change is different though assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, }, }, ch) @@ -249,7 +249,7 @@ func TestMergeLeft(t *testing.T) { firstRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_JOINING, Tokens: []uint32{5, 10, 20, 100, 200}}, }, @@ -259,7 +259,7 @@ func TestMergeLeft(t *testing.T) { // Not normalised because it contains duplicate and unsorted tokens. firstRingNotNormalised := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_JOINING, Tokens: []uint32{20, 10, 5, 10, 20, 100, 200, 100}}, }, @@ -268,7 +268,7 @@ func TestMergeLeft(t *testing.T) { secondRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_LEFT}, }, } @@ -277,7 +277,7 @@ func TestMergeLeft(t *testing.T) { // Not normalised because it contains a InstanceState_LEFT ingester with tokens. secondRingNotNormalised := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_LEFT, Tokens: []uint32{5, 10, 20, 100, 200}}, }, } @@ -285,7 +285,7 @@ func TestMergeLeft(t *testing.T) { expectedFirstSecondMerge := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_LEFT}, }, @@ -294,7 +294,7 @@ func TestMergeLeft(t *testing.T) { thirdRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_LEAVING, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_JOINING, Tokens: []uint32{5, 10, 20, 100, 200}}, // from firstRing }, @@ -303,7 +303,7 @@ func TestMergeLeft(t *testing.T) { expectedFirstSecondThirdMerge := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_LEAVING, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_LEFT}, }, @@ -314,7 +314,7 @@ func TestMergeLeft(t *testing.T) { our, ch := merge(firstRing(), secondRing()) assert.Equal(t, expectedFirstSecondMerge(), our) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_LEFT}, }, }, ch) @@ -324,7 +324,7 @@ func TestMergeLeft(t *testing.T) { our, ch := merge(firstRing(), secondRingNotNormalised()) assert.Equal(t, expectedFirstSecondMerge(), our) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_LEFT}, }, }, ch) @@ -342,7 +342,7 @@ func TestMergeLeft(t *testing.T) { assert.Equal(t, expectedFirstSecondMerge(), our) // when merging first into second ring, only "Ing 1" is new assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, }, }, ch) @@ -353,7 +353,7 @@ func TestMergeLeft(t *testing.T) { assert.Equal(t, expectedFirstSecondMerge(), our) // when merging first into second ring, only "Ing 1" is new assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, }, }, ch) @@ -376,7 +376,7 @@ func TestMergeRemoveMissing(t *testing.T) { firstRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now, State: InstanceState_JOINING, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now, State: InstanceState_LEAVING, Tokens: []uint32{5, 10, 20, 100, 200}}, @@ -386,7 +386,7 @@ func TestMergeRemoveMissing(t *testing.T) { secondRing := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, }, @@ -395,7 +395,7 @@ func TestMergeRemoveMissing(t *testing.T) { expectedFirstSecondMerge := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now + 3, State: InstanceState_LEFT}, // When deleting, time depends on value passed to merge function. @@ -407,7 +407,7 @@ func TestMergeRemoveMissing(t *testing.T) { our, ch := mergeLocalCAS(firstRing(), secondRing(), now+3) assert.Equal(t, expectedFirstSecondMerge(), our) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now + 3, State: InstanceState_LEFT}, // When deleting, time depends on value passed to merge function. }, @@ -423,7 +423,7 @@ func TestMergeRemoveMissing(t *testing.T) { { // commutativity is broken when deleting missing entries. But let's make sure we get reasonable results at least. our, ch := mergeLocalCAS(secondRing(), firstRing(), now+3) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now, State: InstanceState_LEAVING}, @@ -431,7 +431,7 @@ func TestMergeRemoveMissing(t *testing.T) { }, our) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 3": {Addr: "addr3", Timestamp: now, State: InstanceState_LEAVING}, }, }, ch) @@ -443,7 +443,7 @@ func TestMergeMissingIntoLeft(t *testing.T) { ring1 := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 5, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now, State: InstanceState_LEFT}, @@ -453,7 +453,7 @@ func TestMergeMissingIntoLeft(t *testing.T) { ring2 := func() *Desc { return &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, }, @@ -463,7 +463,7 @@ func TestMergeMissingIntoLeft(t *testing.T) { { our, ch := mergeLocalCAS(ring1(), ring2(), now+10) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, "Ing 3": {Addr: "addr3", Timestamp: now, State: InstanceState_LEFT}, @@ -471,7 +471,7 @@ func TestMergeMissingIntoLeft(t *testing.T) { }, our) assert.Equal(t, &Desc{ - Ingesters: map[string]InstanceDesc{ + Ingesters: map[string]*InstanceDesc{ "Ing 1": {Addr: "addr1", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{30, 40, 50}}, "Ing 2": {Addr: "addr2", Timestamp: now + 10, State: InstanceState_ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}}, // Ing 3 is not changed, it was already InstanceState_LEFT diff --git a/ring/model.go b/ring/model.go index 72d385a42..6dddef1cd 100644 --- a/ring/model.go +++ b/ring/model.go @@ -285,6 +285,10 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now for name, oing := range otherIngesterMap { ting := thisIngesterMap[name] + if ting == nil { + ting = &InstanceDesc{} + } + // ting.Timestamp will be 0, if there was no such ingester in our version if oing.Timestamp > ting.Timestamp { if !tokensEqual(ting.Tokens, oing.Tokens) {