Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add keyrange support for vtorc clusters_to_watch #457

Merged
merged 8 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config string config file name
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
Expand Down
106 changes: 79 additions & 27 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"slices"
"strings"
"sync"
Expand All @@ -31,6 +32,7 @@ import (
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand All @@ -57,7 +59,7 @@ var (

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}

Expand All @@ -81,6 +83,76 @@ func refreshAllTablets() {
}, false /* forceRefresh */)
}

// getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards.
// This handles both individual shards or key ranges using TabletFilter from the discovery package.
func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) {
// Parse input and build list of keyspaces / shards
var keyspaceShards []*topo.KeyspaceShard

keyspaces := make(map[string]map[string]string)
filters := make(map[string][]string)

keyspaces["ranged"] = map[string]string{}
keyspaces["full"] = map[string]string{}

for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") {
// This is a keyspace/shard specification
input := strings.Split(ks, "/")
keyspaces["ranged"][input[0]] = "ranged"
// filter creation expects a pipe separator between keyspace and shard
filters[input[0]] = append(filters[input[0]], fmt.Sprintf("%s|%s", input[0], input[1]))

} else {
keyspaces["full"][ks] = "full"
}
}

// Copy function will combine the two maps. It will override any keyspaces in ranged that also exist in full with the
// full designation because we assume that the full keyspace will take precedence over a keyspace/shard specification within the same input.
// e.g. If the clustersToWatch is `ks1,...ks1/10-20`, all tablets in ks1 should be watched.
maps.Copy(keyspaces["ranged"], keyspaces["full"])

if len(keyspaces["ranged"]) > 0 {
for ks, filterType := range keyspaces["ranged"] {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()

shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the errr and continue
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
}

if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
}

if filterType == "ranged" {
shardFilter, err := discovery.NewFilterByShard(filters[ks])
if err != nil {
log.Error(err)
return keyspaceShards, err
}

for _, s := range shards {
if shardFilter.IsIncluded(&topodatapb.Tablet{Keyspace: ks, Shard: s}) {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
}
}
} else {
for _, s := range shards {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
}
}
}
}

return keyspaceShards, nil
}

func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
if !IsLeaderOrActive() {
return
Expand All @@ -106,33 +178,13 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}
wg.Wait()
} else {
// Parse input and build list of keyspaces / shards
var keyspaceShards []*topo.KeyspaceShard
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") {
// This is a keyspace/shard specification
input := strings.Split(ks, "/")
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the errr and continue
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
}
if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
}
for _, s := range shards {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
}
}
keyspaceShards, err := getKeyspaceShardsToWatch()
if err != nil {
log.Error(err)
return
}
if len(keyspaceShards) == 0 {

if len(keyspaceShards) == 0 || keyspaceShards == nil {
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
return
}
Expand Down
109 changes: 109 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,115 @@ func TestShardPrimary(t *testing.T) {
}
}

func TestGetKeyspaceShardsToWatch(t *testing.T) {
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ts = memorytopo.NewServer(ctx, "test_cell")

shards1 := []string{"-40", "40-50", "50-60", "60-70", "70-80", "80-"}
keyspace1 := "test_keyspace"

shards2 := []string{"-1000", "1000-1100", "1100-1200", "1200-1300", "1300-"}
keyspace2 := "test_keyspace2"

if err := ts.CreateKeyspace(ctx, keyspace1, &topodatapb.Keyspace{}); err != nil {
t.Fatalf("cannot create keyspace: %v", err)
}

if err := ts.CreateKeyspace(ctx, keyspace2, &topodatapb.Keyspace{}); err != nil {
t.Fatalf("cannot create keyspace: %v", err)
}

for _, shard := range shards1 {
if err := ts.CreateShard(ctx, keyspace1, shard); err != nil {
t.Fatalf("cannot create shard: %v", err)
}
}

for _, shard := range shards2 {
if err := ts.CreateShard(ctx, keyspace2, shard); err != nil {
t.Fatalf("cannot create shard: %v", err)
}
}

testcases := []*struct {
name string
clusters []string
expected []*topo.KeyspaceShard
}{
{
name: "single shard and range",
clusters: []string{fmt.Sprintf("%s/40-50", keyspace1), fmt.Sprintf("%s/60-80", keyspace1)},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspace1, Shard: "40-50"},
{Keyspace: keyspace1, Shard: "60-70"},
{Keyspace: keyspace1, Shard: "70-80"},
},
}, {
name: "single shard",
clusters: []string{fmt.Sprintf("%s/40-50", keyspace1)},
expected: []*topo.KeyspaceShard{{Keyspace: keyspace1, Shard: "40-50"}},
}, {
name: "full keyspace",
clusters: []string{keyspace1},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspace1, Shard: "-40"},
{Keyspace: keyspace1, Shard: "40-50"},
{Keyspace: keyspace1, Shard: "50-60"},
{Keyspace: keyspace1, Shard: "60-70"},
{Keyspace: keyspace1, Shard: "70-80"},
{Keyspace: keyspace1, Shard: "80-"},
},
}, {
name: "full keyspace with keyrange",
clusters: []string{keyspace1, fmt.Sprintf("%s/60-80", keyspace1)},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspace1, Shard: "-40"},
{Keyspace: keyspace1, Shard: "40-50"},
{Keyspace: keyspace1, Shard: "50-60"},
{Keyspace: keyspace1, Shard: "60-70"},
{Keyspace: keyspace1, Shard: "70-80"},
{Keyspace: keyspace1, Shard: "80-"},
},
}, {
name: "multi keyspace",
clusters: []string{keyspace1, fmt.Sprintf("%s/1100-1300", keyspace2)},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspace2, Shard: "1100-1200"},
{Keyspace: keyspace2, Shard: "1200-1300"},
{Keyspace: keyspace1, Shard: "-40"},
{Keyspace: keyspace1, Shard: "40-50"},
{Keyspace: keyspace1, Shard: "50-60"},
{Keyspace: keyspace1, Shard: "60-70"},
{Keyspace: keyspace1, Shard: "70-80"},
{Keyspace: keyspace1, Shard: "80-"},
},
}, {
name: "partial success with non-existent shard",
clusters: []string{"test_keyspace3/10-20", fmt.Sprintf("%s/1100-1300", keyspace2)},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspace2, Shard: "1100-1200"},
{Keyspace: keyspace2, Shard: "1200-1300"},
},
}, {
name: "empty result",
clusters: []string{"test_keyspace3/10-20"},
expected: nil,
},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
clustersToWatch = testcase.clusters
res, err := getKeyspaceShardsToWatch()

assert.NoError(t, err)
assert.Equal(t, testcase.expected, res)
})
}
}

// verifyRefreshTabletsInKeyspaceShard calls refreshTabletsInKeyspaceShard with the forceRefresh parameter provided and verifies that
// the number of instances refreshed matches the parameter and all the tablets match the ones provided
func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet, tabletsToIgnore []string) {
Expand Down
Loading