From 496aeffc7282ff7d0ff3c1b2a88d4c3c06b82eae Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 15 Jan 2025 20:13:28 +0100 Subject: [PATCH] concurrency Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 64 ++++++++++++++++----------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 020b82f1fa1..8ef51a96c10 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -28,6 +28,7 @@ import ( "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" @@ -82,22 +83,20 @@ func refreshAllTablets() { } // keyRangesContainShard returns true if a slice of key ranges contains the provided shard. -func keyRangesContainShard(keyRanges []*topodatapb.KeyRange, shard string) bool { - _, shardKeyRange, err := topo.ValidateShardName(shard) +func keyRangesContainShard(keyRanges []*topodatapb.KeyRange, shard string) (bool, error) { + shard, shardKeyRange, err := topo.ValidateShardName(shard) if err != nil { - log.Errorf("Failed to validate shard name %q: %+v", shard, err) - return false + return false, err } for _, keyRange := range keyRanges { if key.KeyRangeContainsKeyRange(keyRange, shardKeyRange) { - return true + return true, nil } } - return false + return false, nil } // getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards. -// This handles both individual shards or key ranges using TabletFilter from the discovery package. func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) { var keyspaceShards []*topo.KeyspaceShard keyspaceKeyRanges := make(map[string][]*topodatapb.KeyRange) @@ -121,30 +120,41 @@ func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) { keyspaceKeyRanges[keyspace] = append(keyspaceKeyRanges[keyspace], keyRange) } + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + eg, ctx := errgroup.WithContext(ctx) + + var keyspaceShardsMu sync.Mutex for keyspace, keyRanges := range keyspaceKeyRanges { - shards, err := func() ([]string, error) { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - return ts.GetShardNames(ctx, keyspace) - }() - if err != nil { - log.Errorf("Error fetching shards for keyspace: %v", keyspace) - return nil, err - } + eg.Go(func() error { + shards, err := ts.GetShardNames(ctx, keyspace) + if err != nil { + log.Errorf("Error fetching shards for keyspace: %v", keyspace) + return err + } - if len(shards) == 0 { - log.Errorf("Topo has no shards for keyspace %v", keyspace) - continue - } + if len(shards) == 0 { + log.Errorf("Topo has no shards for keyspace %v", keyspace) + return nil + } - for _, s := range shards { - if keyRangesContainShard(keyRanges, s) { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{ - Keyspace: keyspace, - Shard: s, - }) + for _, s := range shards { + if found, err := keyRangesContainShard(keyRanges, s); err != nil { + log.Errorf("Failed to parse key ranges for shard %q: %+v", s, err) + } else if found { + keyspaceShardsMu.Lock() + defer keyspaceShardsMu.Unlock() + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{ + Keyspace: keyspace, + Shard: s, + }) + } } - } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err } return keyspaceShards, nil