From 02d3f5cd7e8868e72ffd9a67279fa28d07c0d227 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sat, 16 Nov 2024 11:53:13 +0800 Subject: [PATCH] fix(coordinator): choose highest term and entry node as leader (#579) ### Motivation Follow the [TLA+](https://github.com/streamnative/oxia/blob/6d2bf5de8ffd027804aab6302b80a5118367b061/tlaplus/OxiaReplication.tla#L508). We should choose the highest term entry node as the leader, which can also fix the issue we encountered. - The leader can't truncate the following because of the [protection logic](https://github.com/streamnative/oxia/blob/6d2bf5de8ffd027804aab6302b80a5118367b061/server/leader_controller.go#L511-L515). ``` {"level":"info","time":"2024-11-14T01:04:55.195566194Z","component":"shard-controller","namespace":"xxx","shard":10,"term":278,"time":"2024-11-14T01:04:55.195588258Z","message":"Starting leader election"} {"level":"info","time":"2024-11-14T01:04:55.218118449Z","component":"shard-controller","entry-id":{"term":"78","offset":"5652112"},"namespace":"xxx","server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"time":"2024-11-14T01:04:55.218152924Z","message":"Processed newTerm response"} {"level":"info","time":"2024-11-14T01:04:55.218347701Z","component":"shard-controller","entry-id":{"term":"78","offset":"5652112"},"namespace":"xxx","server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"time":"2024-11-14T01:04:55.218369944Z","message":"Processed newTerm response"} {"level":"info","time":"2024-11-14T01:04:55.219517988Z","component":"shard-controller","entry-id":{"term":"77","offset":"5653106"},"namespace":"xxx","server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"time":"2024-11-14T01:04:55.219534113Z","message":"Processed newTerm response"} {"level":"info","time":"2024-11-14T01:04:55.219554195Z","component":"shard-controller","followers":[{"server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"entry-id":{"term":78,"offset":5652112}},{"server-address":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"entry-id":{"term":78,"offset":5652112}}],"namespace":"xxx","new-leader":{"public":"xxxxxxx:6648","internal":"xxxxxxx:6649"},"shard":10,"term":278,"time":"2024-11-14T01:04:55.219564969Z","message":"Successfully moved ensemble to a new term"} ``` Co-authored-by: Matteo Merli --- coordinator/impl/shard_controller.go | 27 ++++++++++++++--------- coordinator/impl/shard_controller_test.go | 18 +++++++++++++++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index c299d76c..9bdc2ce2 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -324,7 +324,7 @@ func (s *shardController) electLeader() error { return err } - newLeader, followers := s.selectNewLeader(fr) + newLeader, followers := selectNewLeader(fr) if s.log.Enabled(context.Background(), slog.LevelInfo) { f := make([]struct { @@ -655,22 +655,27 @@ func (s *shardController) deleteShardRpc(ctx context.Context, node model.ServerA return err } -func (*shardController) selectNewLeader(newTermResponses map[model.ServerAddress]*proto.EntryId) ( +func selectNewLeader(newTermResponses map[model.ServerAddress]*proto.EntryId) ( leader model.ServerAddress, followers map[model.ServerAddress]*proto.EntryId) { + // Select all the nodes that have the highest term first + var currentMaxTerm int64 = -1 // Select all the nodes that have the highest entry in the wal var currentMax int64 = -1 var candidates []model.ServerAddress for addr, headEntryId := range newTermResponses { - switch { - case headEntryId.Offset < currentMax: - continue - case headEntryId.Offset == currentMax: - candidates = append(candidates, addr) - default: - // Found a new max - currentMax = headEntryId.Offset - candidates = []model.ServerAddress{addr} + if headEntryId.Term >= currentMaxTerm { + currentMaxTerm = headEntryId.Term + switch { + case headEntryId.Offset < currentMax: + continue + case headEntryId.Offset == currentMax: + candidates = append(candidates, addr) + default: + // Found a new max + currentMax = headEntryId.Offset + candidates = []model.ServerAddress{addr} + } } } diff --git a/coordinator/impl/shard_controller_test.go b/coordinator/impl/shard_controller_test.go index 725e65eb..b1819cda 100644 --- a/coordinator/impl/shard_controller_test.go +++ b/coordinator/impl/shard_controller_test.go @@ -35,6 +35,24 @@ var namespaceConfig = &model.NamespaceConfig{ NotificationsEnabled: common.OptBooleanDefaultTrue{}, } +func TestLeaderElection_ShouldChooseHighestTerm(t *testing.T) { + s1 := model.ServerAddress{Public: "1", Internal: "1"} + s2 := model.ServerAddress{Public: "2", Internal: "2"} + s3 := model.ServerAddress{Public: "3", Internal: "3"} + candidates := map[model.ServerAddress]*proto.EntryId{ + s1: {Term: 200, Offset: 2480}, + s2: {Term: 200, Offset: 2500}, + s3: {Term: 198, Offset: 3000}, + } + leader, followers := selectNewLeader(candidates) + assert.EqualValues(t, leader, s2) + assert.EqualValues(t, 2, len(followers)) + _, exist := followers[s1] + assert.True(t, exist) + _, exist = followers[s3] + assert.True(t, exist) +} + func TestShardController(t *testing.T) { var shard int64 = 5 rpc := newMockRpcProvider()