diff --git a/go/vt/topo/wildcards.go b/go/vt/topo/wildcards.go index c2921be1d19..d66baf1673f 100644 --- a/go/vt/topo/wildcards.go +++ b/go/vt/topo/wildcards.go @@ -24,6 +24,7 @@ import ( "context" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/fileutil" @@ -64,6 +65,10 @@ type KeyspaceShard struct { Shard string } +func (ks KeyspaceShard) String() string { + return topoproto.KeyspaceShardString(ks.Keyspace, ks.Shard) +} + // ResolveShardWildcard will resolve shard wildcards. Both keyspace and shard // names can use wildcard. Errors talking to the topology server are returned. // ErrNoNode is ignored if it's the result of resolving a wildcard. Examples: diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 73cd61676ca..fb2382e8493 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "maps" "slices" "strings" "sync" @@ -29,11 +28,12 @@ import ( "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -82,71 +82,79 @@ func refreshAllTablets() { }, false /* forceRefresh */) } +// hasShardInKeyRanges returns true if a slice of key ranges contains the provided shard. +func hasShardInKeyRanges(shard string, keyRanges []*topodatapb.KeyRange) (bool, error) { + _, shardKeyRange, err := topo.ValidateShardName(shard) + if err != nil { + return false, err + } + for _, keyRange := range keyRanges { + if key.KeyRangeContainsKeyRange(keyRange, shardKeyRange) { + return true, nil + } + } + return false, nil +} + // getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards. -// This handles both individual shards or key ranges using TabletFilter from the discovery package. func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) { - // Parse input and build list of keyspaces / shards var keyspaceShards []*topo.KeyspaceShard - - keyspaces := make(map[string]map[string]string) - filters := make(map[string][]string) - - keyspaces["ranged"] = map[string]string{} - keyspaces["full"] = map[string]string{} - - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaces["ranged"][input[0]] = "ranged" - // filter creation expects a pipe separator between keyspace and shard - filters[input[0]] = append(filters[input[0]], fmt.Sprintf("%s|%s", input[0], input[1])) - - } else { - keyspaces["full"][ks] = "full" + keyspaceWatchKeyRanges := make(map[string][]*topodatapb.KeyRange, 0) + for _, clusterToWatch := range clustersToWatch { + var err error + keyRange := &topodatapb.KeyRange{} + keyspace := clusterToWatch + if strings.Contains(clusterToWatch, "/") { + var shard string + keyspace, shard, err = topoproto.ParseKeyspaceShard(clusterToWatch) + if err != nil { + log.Errorf("Failed to parse keyspace/shard %q: %+v", clusterToWatch, err) + continue + } + shard, keyRange, err = topo.ValidateShardName(shard) + if err != nil { + log.Errorf("Failed to parse shard name %q: %+v", shard, err) + continue + } } + keyspaceWatchKeyRanges[keyspace] = append(keyspaceWatchKeyRanges[keyspace], keyRange) } - // Copy function will combine the two maps. It will override any keyspaces in ranged that also exist in full with the - // full designation because we assume that the full keyspace will take precedence over a keyspace/shard specification within the same input. - // e.g. If the clustersToWatch is `ks1,...ks1/10-20`, all tablets in ks1 should be watched. - maps.Copy(keyspaces["ranged"], keyspaces["full"]) - - if len(keyspaces["ranged"]) > 0 { - for ks, filterType := range keyspaces["ranged"] { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + eg, ctx := errgroup.WithContext(ctx) - shards, err := ts.GetShardNames(ctx, ks) + var keyspaceShardsMu sync.Mutex + for keyspace, watchKeyRanges := range keyspaceWatchKeyRanges { + eg.Go(func() error { + shards, err := ts.GetShardNames(ctx, keyspace) if err != nil { - // Log the errr and continue - log.Errorf("Error fetching shards for keyspace: %v", ks) - continue + log.Errorf("Error fetching shards for keyspace: %v", keyspace) + return err } if len(shards) == 0 { - log.Errorf("Topo has no shards for ks: %v", ks) - continue + log.Errorf("Topo has no shards for keyspace %v", keyspace) + return nil } - if filterType == "ranged" { - shardFilter, err := discovery.NewFilterByShard(filters[ks]) - if err != nil { - log.Error(err) - return keyspaceShards, err - } - - for _, s := range shards { - if shardFilter.IsIncluded(&topodatapb.Tablet{Keyspace: ks, Shard: s}) { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) - } - } - } else { - for _, s := range shards { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) + for _, s := range shards { + if found, err := hasShardInKeyRanges(s, watchKeyRanges); err != nil { + log.Errorf("Failed to parse key ranges for shard %q: %+v", s, err) + } else if found { + keyspaceShardsMu.Lock() + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{ + Keyspace: keyspace, + Shard: s, + }) + keyspaceShardsMu.Unlock() } } - } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err } return keyspaceShards, nil diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index c4aef5ba0eb..fce721aac58 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -19,6 +19,8 @@ package logic import ( "context" "fmt" + "slices" + "strings" "sync/atomic" "testing" "time" @@ -396,6 +398,13 @@ func TestGetKeyspaceShardsToWatch(t *testing.T) { clustersToWatch = testcase.clusters res, err := getKeyspaceShardsToWatch() + slices.SortStableFunc(res, func(a, b *topo.KeyspaceShard) int { + return strings.Compare(a.String(), b.String()) + }) + slices.SortStableFunc(testcase.expected, func(a, b *topo.KeyspaceShard) int { + return strings.Compare(a.String(), b.String()) + }) + assert.NoError(t, err) assert.ElementsMatch(t, testcase.expected, res) })