Skip to content

Commit

Permalink
concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Jan 15, 2025
1 parent 266ff9a commit 496aeff
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/spf13/pflag"

"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -82,22 +83,20 @@ func refreshAllTablets() {
}

// keyRangesContainShard returns true if a slice of key ranges contains the provided shard.
func keyRangesContainShard(keyRanges []*topodatapb.KeyRange, shard string) bool {
_, shardKeyRange, err := topo.ValidateShardName(shard)
func keyRangesContainShard(keyRanges []*topodatapb.KeyRange, shard string) (bool, error) {
shard, shardKeyRange, err := topo.ValidateShardName(shard)

Check failure on line 87 in go/vt/vtorc/logic/tablet_discovery.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

ineffectual assignment to shard (ineffassign)

Check failure on line 87 in go/vt/vtorc/logic/tablet_discovery.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

ineffectual assignment to shard (ineffassign)
if err != nil {
log.Errorf("Failed to validate shard name %q: %+v", shard, err)
return false
return false, err
}
for _, keyRange := range keyRanges {
if key.KeyRangeContainsKeyRange(keyRange, shardKeyRange) {
return true
return true, nil
}
}
return false
return false, nil
}

// getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards.
// This handles both individual shards or key ranges using TabletFilter from the discovery package.
func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) {
var keyspaceShards []*topo.KeyspaceShard
keyspaceKeyRanges := make(map[string][]*topodatapb.KeyRange)
Expand All @@ -121,30 +120,41 @@ func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) {
keyspaceKeyRanges[keyspace] = append(keyspaceKeyRanges[keyspace], keyRange)
}

ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)

var keyspaceShardsMu sync.Mutex
for keyspace, keyRanges := range keyspaceKeyRanges {
shards, err := func() ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
return ts.GetShardNames(ctx, keyspace)
}()
if err != nil {
log.Errorf("Error fetching shards for keyspace: %v", keyspace)
return nil, err
}
eg.Go(func() error {
shards, err := ts.GetShardNames(ctx, keyspace)
if err != nil {
log.Errorf("Error fetching shards for keyspace: %v", keyspace)
return err
}

if len(shards) == 0 {
log.Errorf("Topo has no shards for keyspace %v", keyspace)
continue
}
if len(shards) == 0 {
log.Errorf("Topo has no shards for keyspace %v", keyspace)
return nil
}

for _, s := range shards {
if keyRangesContainShard(keyRanges, s) {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{
Keyspace: keyspace,
Shard: s,
})
for _, s := range shards {
if found, err := keyRangesContainShard(keyRanges, s); err != nil {
log.Errorf("Failed to parse key ranges for shard %q: %+v", s, err)
} else if found {
keyspaceShardsMu.Lock()
defer keyspaceShardsMu.Unlock()
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{
Keyspace: keyspace,
Shard: s,
})
}
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}

return keyspaceShards, nil
Expand Down

0 comments on commit 496aeff

Please sign in to comment.