From e1bbd104f81a989db620e6bcfe1056c889c50db7 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Wed, 17 Jul 2024 15:26:17 -0700 Subject: [PATCH 1/6] add keyrange support for vtorc clusters_to_watch Signed-off-by: Priya Bibra --- go/vt/vtorc/logic/tablet_discovery.go | 104 +++++++++++++++++++------- 1 file changed, 78 insertions(+), 26 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 6914ebd546d..b3a2512883d 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "maps" "slices" "strings" "sync" @@ -31,6 +32,7 @@ import ( "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -57,7 +59,7 @@ var ( // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { - fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } @@ -81,6 +83,76 @@ func refreshAllTablets() { }, false /* forceRefresh */) } +// getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards. +// This handles both individual shards or key ranges using TabletFilter from the discovery package. +func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) { + // Parse input and build list of keyspaces / shards + var keyspaceShards []*topo.KeyspaceShard + + keyspaces := make(map[string]map[string]string) + filters := make(map[string][]string) + + keyspaces["ranged"] = map[string]string{} + keyspaces["full"] = map[string]string{} + + for _, ks := range clustersToWatch { + if strings.Contains(ks, "/") { + // This is a keyspace/shard specification + input := strings.Split(ks, "/") + keyspaces["ranged"][input[0]] = "ranged" + // filter creation expects a pipe separator between keyspace and shard + filters[input[0]] = append(filters[input[0]], fmt.Sprintf("%s|%s", input[0], input[1])) + + } else { + keyspaces["full"][ks] = "full" + } + } + + // Copy function will combine the two maps. It will override any keyspaces in ranged that also exist in full with the + // full designation because we assume that the full keyspace will take precedence over a keyspace/shard specification within the same input. + // e.g. If the clustersToWatch is `ks1,...ks1/10-20`, all tablets in ks1 should be watched. + maps.Copy(keyspaces["ranged"], keyspaces["full"]) + + if len(keyspaces["ranged"]) > 0 { + for ks, filterType := range keyspaces["ranged"] { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + + shards, err := ts.GetShardNames(ctx, ks) + if err != nil { + // Log the errr and continue + log.Errorf("Error fetching shards for keyspace: %v", ks) + continue + } + + if len(shards) == 0 { + log.Errorf("Topo has no shards for ks: %v", ks) + continue + } + + if filterType == "ranged" { + shardFilter, err := discovery.NewFilterByShard(filters[ks]) + if err != nil { + log.Error(err) + return keyspaceShards, err + } + + for _, s := range shards { + if shardFilter.IsIncluded(&topodatapb.Tablet{Keyspace: ks, Shard: s}) { + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) + } + } + } else { + for _, s := range shards { + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) + } + } + } + } + + return keyspaceShards, nil +} + func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { if !IsLeaderOrActive() { return @@ -106,32 +178,12 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } wg.Wait() } else { - // Parse input and build list of keyspaces / shards - var keyspaceShards []*topo.KeyspaceShard - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]}) - } else { - // Assume this is a keyspace and find all shards in keyspace - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - shards, err := ts.GetShardNames(ctx, ks) - if err != nil { - // Log the errr and continue - log.Errorf("Error fetching shards for keyspace: %v", ks) - continue - } - if len(shards) == 0 { - log.Errorf("Topo has no shards for ks: %v", ks) - continue - } - for _, s := range shards { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) - } - } + keyspaceShards, err := getKeyspaceShardsToWatch() + if err != nil { + log.Error(err) + return } + if len(keyspaceShards) == 0 { log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch) return From 80a25e6c810748504da7a85e91b4b72c45ea352d Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Wed, 17 Jul 2024 16:29:14 -0700 Subject: [PATCH 2/6] update help txt Signed-off-by: Priya Bibra --- go/flags/endtoend/vtorc.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index aa0b03c63ff..b7714068ff7 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -25,7 +25,7 @@ Flags: --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified --change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED - --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" + --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" --config string config file name --config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored. --config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn) From 8472a63939eff3a9d0bb30bd24d1576e1bfc8d6d Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Thu, 18 Jul 2024 14:18:33 -0700 Subject: [PATCH 3/6] add unit tests Signed-off-by: Priya Bibra --- go/vt/vtorc/logic/tablet_discovery.go | 2 +- go/vt/vtorc/logic/tablet_discovery_test.go | 109 +++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index b3a2512883d..cd112481edc 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -184,7 +184,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { return } - if len(keyspaceShards) == 0 { + if len(keyspaceShards) == 0 || keyspaceShards == nil { log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch) return } diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index e31b49e4b0e..53c643306da 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -277,6 +277,115 @@ func TestShardPrimary(t *testing.T) { } } +func TestGetKeyspaceShardsToWatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts = memorytopo.NewServer(ctx, "test_cell") + + shards1 := []string{"-40", "40-50", "50-60", "60-70", "70-80", "80-"} + keyspace1 := "test_keyspace" + + shards2 := []string{"-1000", "1000-1100", "1100-1200", "1200-1300", "1300-"} + keyspace2 := "test_keyspace2" + + if err := ts.CreateKeyspace(ctx, keyspace1, &topodatapb.Keyspace{}); err != nil { + t.Fatalf("cannot create keyspace: %v", err) + } + + if err := ts.CreateKeyspace(ctx, keyspace2, &topodatapb.Keyspace{}); err != nil { + t.Fatalf("cannot create keyspace: %v", err) + } + + for _, shard := range shards1 { + if err := ts.CreateShard(ctx, keyspace1, shard); err != nil { + t.Fatalf("cannot create shard: %v", err) + } + } + + for _, shard := range shards2 { + if err := ts.CreateShard(ctx, keyspace2, shard); err != nil { + t.Fatalf("cannot create shard: %v", err) + } + } + + testcases := []*struct { + name string + clusters []string + expected []*topo.KeyspaceShard + }{ + { + name: "single shard and range", + clusters: []string{fmt.Sprintf("%s/40-50", keyspace1), fmt.Sprintf("%s/60-80", keyspace1)}, + expected: []*topo.KeyspaceShard{ + {Keyspace: keyspace1, Shard: "40-50"}, + {Keyspace: keyspace1, Shard: "60-70"}, + {Keyspace: keyspace1, Shard: "70-80"}, + }, + }, { + name: "single shard", + clusters: []string{fmt.Sprintf("%s/40-50", keyspace1)}, + expected: []*topo.KeyspaceShard{{Keyspace: keyspace1, Shard: "40-50"}}, + }, { + name: "full keyspace", + clusters: []string{keyspace1}, + expected: []*topo.KeyspaceShard{ + {Keyspace: keyspace1, Shard: "-40"}, + {Keyspace: keyspace1, Shard: "40-50"}, + {Keyspace: keyspace1, Shard: "50-60"}, + {Keyspace: keyspace1, Shard: "60-70"}, + {Keyspace: keyspace1, Shard: "70-80"}, + {Keyspace: keyspace1, Shard: "80-"}, + }, + }, { + name: "full keyspace with keyrange", + clusters: []string{keyspace1, fmt.Sprintf("%s/60-80", keyspace1)}, + expected: []*topo.KeyspaceShard{ + {Keyspace: keyspace1, Shard: "-40"}, + {Keyspace: keyspace1, Shard: "40-50"}, + {Keyspace: keyspace1, Shard: "50-60"}, + {Keyspace: keyspace1, Shard: "60-70"}, + {Keyspace: keyspace1, Shard: "70-80"}, + {Keyspace: keyspace1, Shard: "80-"}, + }, + }, { + name: "multi keyspace", + clusters: []string{keyspace1, fmt.Sprintf("%s/1100-1300", keyspace2)}, + expected: []*topo.KeyspaceShard{ + {Keyspace: keyspace2, Shard: "1100-1200"}, + {Keyspace: keyspace2, Shard: "1200-1300"}, + {Keyspace: keyspace1, Shard: "-40"}, + {Keyspace: keyspace1, Shard: "40-50"}, + {Keyspace: keyspace1, Shard: "50-60"}, + {Keyspace: keyspace1, Shard: "60-70"}, + {Keyspace: keyspace1, Shard: "70-80"}, + {Keyspace: keyspace1, Shard: "80-"}, + }, + }, { + name: "partial success with non-existent shard", + clusters: []string{"test_keyspace3/10-20", fmt.Sprintf("%s/1100-1300", keyspace2)}, + expected: []*topo.KeyspaceShard{ + {Keyspace: keyspace2, Shard: "1100-1200"}, + {Keyspace: keyspace2, Shard: "1200-1300"}, + }, + }, { + name: "empty result", + clusters: []string{"test_keyspace3/10-20"}, + expected: nil, + }, + } + + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + clustersToWatch = testcase.clusters + res, err := getKeyspaceShardsToWatch() + + assert.NoError(t, err) + assert.Equal(t, testcase.expected, res) + }) + } +} + // verifyRefreshTabletsInKeyspaceShard calls refreshTabletsInKeyspaceShard with the forceRefresh parameter provided and verifies that // the number of instances refreshed matches the parameter and all the tablets match the ones provided func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet, tabletsToIgnore []string) { From f46ab3753b1d434feb988ed80a300927fbbdae1a Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Thu, 18 Jul 2024 15:10:52 -0700 Subject: [PATCH 4/6] more test cases Signed-off-by: Priya Bibra --- go/vt/vtorc/logic/tablet_discovery_test.go | 108 ++++++++++++--------- 1 file changed, 62 insertions(+), 46 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 53c643306da..3ade4a0c76e 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -283,32 +283,36 @@ func TestGetKeyspaceShardsToWatch(t *testing.T) { ts = memorytopo.NewServer(ctx, "test_cell") - shards1 := []string{"-40", "40-50", "50-60", "60-70", "70-80", "80-"} - keyspace1 := "test_keyspace" - - shards2 := []string{"-1000", "1000-1100", "1100-1200", "1200-1300", "1300-"} - keyspace2 := "test_keyspace2" - - if err := ts.CreateKeyspace(ctx, keyspace1, &topodatapb.Keyspace{}); err != nil { - t.Fatalf("cannot create keyspace: %v", err) + keyspaces := []string{"test_keyspace", "test_keyspace2", "test_keyspace3", "test_keyspace4"} + for _, k := range keyspaces { + if err := ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{}); err != nil { + t.Fatalf("cannot create keyspace: %v", err) + } } - if err := ts.CreateKeyspace(ctx, keyspace2, &topodatapb.Keyspace{}); err != nil { - t.Fatalf("cannot create keyspace: %v", err) - } + shards1 := []string{"-40", "40-50", "50-60", "60-70", "70-80", "80-"} + shards2 := []string{"-1000", "1000-1100", "1100-1200", "1200-1300", "1300-"} for _, shard := range shards1 { - if err := ts.CreateShard(ctx, keyspace1, shard); err != nil { + if err := ts.CreateShard(ctx, keyspaces[0], shard); err != nil { t.Fatalf("cannot create shard: %v", err) } } for _, shard := range shards2 { - if err := ts.CreateShard(ctx, keyspace2, shard); err != nil { + if err := ts.CreateShard(ctx, keyspaces[1], shard); err != nil { t.Fatalf("cannot create shard: %v", err) } } + if err := ts.CreateShard(ctx, keyspaces[2], "-"); err != nil { + t.Fatalf("cannot create shard: %v", err) + } + + if err := ts.CreateShard(ctx, keyspaces[3], "0"); err != nil { + t.Fatalf("cannot create shard: %v", err) + } + testcases := []*struct { name string clusters []string @@ -316,62 +320,74 @@ func TestGetKeyspaceShardsToWatch(t *testing.T) { }{ { name: "single shard and range", - clusters: []string{fmt.Sprintf("%s/40-50", keyspace1), fmt.Sprintf("%s/60-80", keyspace1)}, + clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0]), fmt.Sprintf("%s/60-80", keyspaces[0])}, expected: []*topo.KeyspaceShard{ - {Keyspace: keyspace1, Shard: "40-50"}, - {Keyspace: keyspace1, Shard: "60-70"}, - {Keyspace: keyspace1, Shard: "70-80"}, + {Keyspace: keyspaces[0], Shard: "40-50"}, + {Keyspace: keyspaces[0], Shard: "60-70"}, + {Keyspace: keyspaces[0], Shard: "70-80"}, }, }, { name: "single shard", - clusters: []string{fmt.Sprintf("%s/40-50", keyspace1)}, - expected: []*topo.KeyspaceShard{{Keyspace: keyspace1, Shard: "40-50"}}, + clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0])}, + expected: []*topo.KeyspaceShard{{Keyspace: keyspaces[0], Shard: "40-50"}}, }, { name: "full keyspace", - clusters: []string{keyspace1}, + clusters: []string{keyspaces[0]}, expected: []*topo.KeyspaceShard{ - {Keyspace: keyspace1, Shard: "-40"}, - {Keyspace: keyspace1, Shard: "40-50"}, - {Keyspace: keyspace1, Shard: "50-60"}, - {Keyspace: keyspace1, Shard: "60-70"}, - {Keyspace: keyspace1, Shard: "70-80"}, - {Keyspace: keyspace1, Shard: "80-"}, + {Keyspace: keyspaces[0], Shard: "-40"}, + {Keyspace: keyspaces[0], Shard: "40-50"}, + {Keyspace: keyspaces[0], Shard: "50-60"}, + {Keyspace: keyspaces[0], Shard: "60-70"}, + {Keyspace: keyspaces[0], Shard: "70-80"}, + {Keyspace: keyspaces[0], Shard: "80-"}, }, }, { name: "full keyspace with keyrange", - clusters: []string{keyspace1, fmt.Sprintf("%s/60-80", keyspace1)}, + clusters: []string{keyspaces[0], fmt.Sprintf("%s/60-80", keyspaces[0])}, expected: []*topo.KeyspaceShard{ - {Keyspace: keyspace1, Shard: "-40"}, - {Keyspace: keyspace1, Shard: "40-50"}, - {Keyspace: keyspace1, Shard: "50-60"}, - {Keyspace: keyspace1, Shard: "60-70"}, - {Keyspace: keyspace1, Shard: "70-80"}, - {Keyspace: keyspace1, Shard: "80-"}, + {Keyspace: keyspaces[0], Shard: "-40"}, + {Keyspace: keyspaces[0], Shard: "40-50"}, + {Keyspace: keyspaces[0], Shard: "50-60"}, + {Keyspace: keyspaces[0], Shard: "60-70"}, + {Keyspace: keyspaces[0], Shard: "70-80"}, + {Keyspace: keyspaces[0], Shard: "80-"}, }, }, { name: "multi keyspace", - clusters: []string{keyspace1, fmt.Sprintf("%s/1100-1300", keyspace2)}, + clusters: []string{keyspaces[0], fmt.Sprintf("%s/1100-1300", keyspaces[1])}, expected: []*topo.KeyspaceShard{ - {Keyspace: keyspace2, Shard: "1100-1200"}, - {Keyspace: keyspace2, Shard: "1200-1300"}, - {Keyspace: keyspace1, Shard: "-40"}, - {Keyspace: keyspace1, Shard: "40-50"}, - {Keyspace: keyspace1, Shard: "50-60"}, - {Keyspace: keyspace1, Shard: "60-70"}, - {Keyspace: keyspace1, Shard: "70-80"}, - {Keyspace: keyspace1, Shard: "80-"}, + {Keyspace: keyspaces[1], Shard: "1100-1200"}, + {Keyspace: keyspaces[1], Shard: "1200-1300"}, + {Keyspace: keyspaces[0], Shard: "-40"}, + {Keyspace: keyspaces[0], Shard: "40-50"}, + {Keyspace: keyspaces[0], Shard: "50-60"}, + {Keyspace: keyspaces[0], Shard: "60-70"}, + {Keyspace: keyspaces[0], Shard: "70-80"}, + {Keyspace: keyspaces[0], Shard: "80-"}, }, }, { name: "partial success with non-existent shard", - clusters: []string{"test_keyspace3/10-20", fmt.Sprintf("%s/1100-1300", keyspace2)}, + clusters: []string{"non-existent/10-20", fmt.Sprintf("%s/1100-1300", keyspaces[1])}, expected: []*topo.KeyspaceShard{ - {Keyspace: keyspace2, Shard: "1100-1200"}, - {Keyspace: keyspace2, Shard: "1200-1300"}, + {Keyspace: keyspaces[1], Shard: "1100-1200"}, + {Keyspace: keyspaces[1], Shard: "1200-1300"}, }, }, { name: "empty result", - clusters: []string{"test_keyspace3/10-20"}, + clusters: []string{"non-existent/10-20"}, expected: nil, + }, { + name: "single keyspace -", + clusters: []string{keyspaces[2]}, + expected: []*topo.KeyspaceShard{ + {Keyspace: keyspaces[2], Shard: "-"}, + }, + }, { + name: "single keyspace 0", + clusters: []string{keyspaces[3]}, + expected: []*topo.KeyspaceShard{ + {Keyspace: keyspaces[3], Shard: "0"}, + }, }, } From d26a840fc159d8f64b1eb8a4215293db97821a49 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Thu, 18 Jul 2024 16:06:23 -0700 Subject: [PATCH 5/6] update assert Signed-off-by: Priya Bibra --- go/vt/vtorc/logic/tablet_discovery_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 3ade4a0c76e..ad45487e893 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -397,7 +397,7 @@ func TestGetKeyspaceShardsToWatch(t *testing.T) { res, err := getKeyspaceShardsToWatch() assert.NoError(t, err) - assert.Equal(t, testcase.expected, res) + assert.EqualValues(t, testcase.expected, res) }) } } From a8b93f75b4bad8dab50b5b2847d41e53814ce0bb Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Wed, 24 Jul 2024 10:34:50 -0700 Subject: [PATCH 6/6] trigger re-build Signed-off-by: Priya Bibra