diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 6914ebd546d..b3a2512883d 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "maps" "slices" "strings" "sync" @@ -31,6 +32,7 @@ import ( "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -57,7 +59,7 @@ var ( // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { - fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } @@ -81,6 +83,76 @@ func refreshAllTablets() { }, false /* forceRefresh */) } +// getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards. +// This handles both individual shards or key ranges using TabletFilter from the discovery package. +func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) { + // Parse input and build list of keyspaces / shards + var keyspaceShards []*topo.KeyspaceShard + + keyspaces := make(map[string]map[string]string) + filters := make(map[string][]string) + + keyspaces["ranged"] = map[string]string{} + keyspaces["full"] = map[string]string{} + + for _, ks := range clustersToWatch { + if strings.Contains(ks, "/") { + // This is a keyspace/shard specification + input := strings.Split(ks, "/") + keyspaces["ranged"][input[0]] = "ranged" + // filter creation expects a pipe separator between keyspace and shard + filters[input[0]] = append(filters[input[0]], fmt.Sprintf("%s|%s", input[0], input[1])) + + } else { + keyspaces["full"][ks] = "full" + } + } + + // Copy function will combine the two maps. It will override any keyspaces in ranged that also exist in full with the + // full designation because we assume that the full keyspace will take precedence over a keyspace/shard specification within the same input. + // e.g. If the clustersToWatch is `ks1,...ks1/10-20`, all tablets in ks1 should be watched. + maps.Copy(keyspaces["ranged"], keyspaces["full"]) + + if len(keyspaces["ranged"]) > 0 { + for ks, filterType := range keyspaces["ranged"] { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + + shards, err := ts.GetShardNames(ctx, ks) + if err != nil { + // Log the errr and continue + log.Errorf("Error fetching shards for keyspace: %v", ks) + continue + } + + if len(shards) == 0 { + log.Errorf("Topo has no shards for ks: %v", ks) + continue + } + + if filterType == "ranged" { + shardFilter, err := discovery.NewFilterByShard(filters[ks]) + if err != nil { + log.Error(err) + return keyspaceShards, err + } + + for _, s := range shards { + if shardFilter.IsIncluded(&topodatapb.Tablet{Keyspace: ks, Shard: s}) { + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) + } + } + } else { + for _, s := range shards { + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) + } + } + } + } + + return keyspaceShards, nil +} + func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { if !IsLeaderOrActive() { return @@ -106,32 +178,12 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } wg.Wait() } else { - // Parse input and build list of keyspaces / shards - var keyspaceShards []*topo.KeyspaceShard - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]}) - } else { - // Assume this is a keyspace and find all shards in keyspace - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - shards, err := ts.GetShardNames(ctx, ks) - if err != nil { - // Log the errr and continue - log.Errorf("Error fetching shards for keyspace: %v", ks) - continue - } - if len(shards) == 0 { - log.Errorf("Topo has no shards for ks: %v", ks) - continue - } - for _, s := range shards { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) - } - } + keyspaceShards, err := getKeyspaceShardsToWatch() + if err != nil { + log.Error(err) + return } + if len(keyspaceShards) == 0 { log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch) return