Skip to content

Commit 45c2199

Browse files
authored
add keyrange support for vtorc clusters_to_watch (#457)
* add keyrange support for vtorc clusters_to_watch Signed-off-by: Priya Bibra <pbibra@slack-corp.com> * update help txt Signed-off-by: Priya Bibra <pbibra@slack-corp.com> * add unit tests Signed-off-by: Priya Bibra <pbibra@slack-corp.com> * more test cases Signed-off-by: Priya Bibra <pbibra@slack-corp.com> * update assert Signed-off-by: Priya Bibra <pbibra@slack-corp.com> * trigger re-build Signed-off-by: Priya Bibra <pbibra@slack-corp.com> --------- Signed-off-by: Priya Bibra <pbibra@slack-corp.com>
1 parent a0a9e11 commit 45c2199

File tree

3 files changed

+205
-28
lines changed

3 files changed

+205
-28
lines changed

go/flags/endtoend/vtorc.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Flags:
2525
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
2626
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
2727
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
28-
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
28+
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
2929
--config string config file name
3030
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
3131
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)

go/vt/vtorc/logic/tablet_discovery.go

Lines changed: 79 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"maps"
2324
"slices"
2425
"strings"
2526
"sync"
@@ -31,6 +32,7 @@ import (
3132
"google.golang.org/protobuf/encoding/prototext"
3233
"google.golang.org/protobuf/proto"
3334

35+
"vitess.io/vitess/go/vt/discovery"
3436
"vitess.io/vitess/go/vt/external/golib/sqlutils"
3537
"vitess.io/vitess/go/vt/log"
3638
"vitess.io/vitess/go/vt/topo"
@@ -57,7 +59,7 @@ var (
5759

5860
// RegisterFlags registers the flags required by VTOrc
5961
func RegisterFlags(fs *pflag.FlagSet) {
60-
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
62+
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
6163
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
6264
}
6365

@@ -81,6 +83,76 @@ func refreshAllTablets() {
8183
}, false /* forceRefresh */)
8284
}
8385

86+
// getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards.
87+
// This handles both individual shards or key ranges using TabletFilter from the discovery package.
88+
func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) {
89+
// Parse input and build list of keyspaces / shards
90+
var keyspaceShards []*topo.KeyspaceShard
91+
92+
keyspaces := make(map[string]map[string]string)
93+
filters := make(map[string][]string)
94+
95+
keyspaces["ranged"] = map[string]string{}
96+
keyspaces["full"] = map[string]string{}
97+
98+
for _, ks := range clustersToWatch {
99+
if strings.Contains(ks, "/") {
100+
// This is a keyspace/shard specification
101+
input := strings.Split(ks, "/")
102+
keyspaces["ranged"][input[0]] = "ranged"
103+
// filter creation expects a pipe separator between keyspace and shard
104+
filters[input[0]] = append(filters[input[0]], fmt.Sprintf("%s|%s", input[0], input[1]))
105+
106+
} else {
107+
keyspaces["full"][ks] = "full"
108+
}
109+
}
110+
111+
// Copy function will combine the two maps. It will override any keyspaces in ranged that also exist in full with the
112+
// full designation because we assume that the full keyspace will take precedence over a keyspace/shard specification within the same input.
113+
// e.g. If the clustersToWatch is `ks1,...ks1/10-20`, all tablets in ks1 should be watched.
114+
maps.Copy(keyspaces["ranged"], keyspaces["full"])
115+
116+
if len(keyspaces["ranged"]) > 0 {
117+
for ks, filterType := range keyspaces["ranged"] {
118+
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
119+
defer cancel()
120+
121+
shards, err := ts.GetShardNames(ctx, ks)
122+
if err != nil {
123+
// Log the errr and continue
124+
log.Errorf("Error fetching shards for keyspace: %v", ks)
125+
continue
126+
}
127+
128+
if len(shards) == 0 {
129+
log.Errorf("Topo has no shards for ks: %v", ks)
130+
continue
131+
}
132+
133+
if filterType == "ranged" {
134+
shardFilter, err := discovery.NewFilterByShard(filters[ks])
135+
if err != nil {
136+
log.Error(err)
137+
return keyspaceShards, err
138+
}
139+
140+
for _, s := range shards {
141+
if shardFilter.IsIncluded(&topodatapb.Tablet{Keyspace: ks, Shard: s}) {
142+
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
143+
}
144+
}
145+
} else {
146+
for _, s := range shards {
147+
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
148+
}
149+
}
150+
}
151+
}
152+
153+
return keyspaceShards, nil
154+
}
155+
84156
func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
85157
if !IsLeaderOrActive() {
86158
return
@@ -106,33 +178,13 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
106178
}
107179
wg.Wait()
108180
} else {
109-
// Parse input and build list of keyspaces / shards
110-
var keyspaceShards []*topo.KeyspaceShard
111-
for _, ks := range clustersToWatch {
112-
if strings.Contains(ks, "/") {
113-
// This is a keyspace/shard specification
114-
input := strings.Split(ks, "/")
115-
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
116-
} else {
117-
// Assume this is a keyspace and find all shards in keyspace
118-
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
119-
defer cancel()
120-
shards, err := ts.GetShardNames(ctx, ks)
121-
if err != nil {
122-
// Log the errr and continue
123-
log.Errorf("Error fetching shards for keyspace: %v", ks)
124-
continue
125-
}
126-
if len(shards) == 0 {
127-
log.Errorf("Topo has no shards for ks: %v", ks)
128-
continue
129-
}
130-
for _, s := range shards {
131-
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
132-
}
133-
}
181+
keyspaceShards, err := getKeyspaceShardsToWatch()
182+
if err != nil {
183+
log.Error(err)
184+
return
134185
}
135-
if len(keyspaceShards) == 0 {
186+
187+
if len(keyspaceShards) == 0 || keyspaceShards == nil {
136188
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
137189
return
138190
}

go/vt/vtorc/logic/tablet_discovery_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,131 @@ func TestShardPrimary(t *testing.T) {
277277
}
278278
}
279279

280+
func TestGetKeyspaceShardsToWatch(t *testing.T) {
281+
ctx, cancel := context.WithCancel(context.Background())
282+
defer cancel()
283+
284+
ts = memorytopo.NewServer(ctx, "test_cell")
285+
286+
keyspaces := []string{"test_keyspace", "test_keyspace2", "test_keyspace3", "test_keyspace4"}
287+
for _, k := range keyspaces {
288+
if err := ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{}); err != nil {
289+
t.Fatalf("cannot create keyspace: %v", err)
290+
}
291+
}
292+
293+
shards1 := []string{"-40", "40-50", "50-60", "60-70", "70-80", "80-"}
294+
shards2 := []string{"-1000", "1000-1100", "1100-1200", "1200-1300", "1300-"}
295+
296+
for _, shard := range shards1 {
297+
if err := ts.CreateShard(ctx, keyspaces[0], shard); err != nil {
298+
t.Fatalf("cannot create shard: %v", err)
299+
}
300+
}
301+
302+
for _, shard := range shards2 {
303+
if err := ts.CreateShard(ctx, keyspaces[1], shard); err != nil {
304+
t.Fatalf("cannot create shard: %v", err)
305+
}
306+
}
307+
308+
if err := ts.CreateShard(ctx, keyspaces[2], "-"); err != nil {
309+
t.Fatalf("cannot create shard: %v", err)
310+
}
311+
312+
if err := ts.CreateShard(ctx, keyspaces[3], "0"); err != nil {
313+
t.Fatalf("cannot create shard: %v", err)
314+
}
315+
316+
testcases := []*struct {
317+
name string
318+
clusters []string
319+
expected []*topo.KeyspaceShard
320+
}{
321+
{
322+
name: "single shard and range",
323+
clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0]), fmt.Sprintf("%s/60-80", keyspaces[0])},
324+
expected: []*topo.KeyspaceShard{
325+
{Keyspace: keyspaces[0], Shard: "40-50"},
326+
{Keyspace: keyspaces[0], Shard: "60-70"},
327+
{Keyspace: keyspaces[0], Shard: "70-80"},
328+
},
329+
}, {
330+
name: "single shard",
331+
clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0])},
332+
expected: []*topo.KeyspaceShard{{Keyspace: keyspaces[0], Shard: "40-50"}},
333+
}, {
334+
name: "full keyspace",
335+
clusters: []string{keyspaces[0]},
336+
expected: []*topo.KeyspaceShard{
337+
{Keyspace: keyspaces[0], Shard: "-40"},
338+
{Keyspace: keyspaces[0], Shard: "40-50"},
339+
{Keyspace: keyspaces[0], Shard: "50-60"},
340+
{Keyspace: keyspaces[0], Shard: "60-70"},
341+
{Keyspace: keyspaces[0], Shard: "70-80"},
342+
{Keyspace: keyspaces[0], Shard: "80-"},
343+
},
344+
}, {
345+
name: "full keyspace with keyrange",
346+
clusters: []string{keyspaces[0], fmt.Sprintf("%s/60-80", keyspaces[0])},
347+
expected: []*topo.KeyspaceShard{
348+
{Keyspace: keyspaces[0], Shard: "-40"},
349+
{Keyspace: keyspaces[0], Shard: "40-50"},
350+
{Keyspace: keyspaces[0], Shard: "50-60"},
351+
{Keyspace: keyspaces[0], Shard: "60-70"},
352+
{Keyspace: keyspaces[0], Shard: "70-80"},
353+
{Keyspace: keyspaces[0], Shard: "80-"},
354+
},
355+
}, {
356+
name: "multi keyspace",
357+
clusters: []string{keyspaces[0], fmt.Sprintf("%s/1100-1300", keyspaces[1])},
358+
expected: []*topo.KeyspaceShard{
359+
{Keyspace: keyspaces[1], Shard: "1100-1200"},
360+
{Keyspace: keyspaces[1], Shard: "1200-1300"},
361+
{Keyspace: keyspaces[0], Shard: "-40"},
362+
{Keyspace: keyspaces[0], Shard: "40-50"},
363+
{Keyspace: keyspaces[0], Shard: "50-60"},
364+
{Keyspace: keyspaces[0], Shard: "60-70"},
365+
{Keyspace: keyspaces[0], Shard: "70-80"},
366+
{Keyspace: keyspaces[0], Shard: "80-"},
367+
},
368+
}, {
369+
name: "partial success with non-existent shard",
370+
clusters: []string{"non-existent/10-20", fmt.Sprintf("%s/1100-1300", keyspaces[1])},
371+
expected: []*topo.KeyspaceShard{
372+
{Keyspace: keyspaces[1], Shard: "1100-1200"},
373+
{Keyspace: keyspaces[1], Shard: "1200-1300"},
374+
},
375+
}, {
376+
name: "empty result",
377+
clusters: []string{"non-existent/10-20"},
378+
expected: nil,
379+
}, {
380+
name: "single keyspace -",
381+
clusters: []string{keyspaces[2]},
382+
expected: []*topo.KeyspaceShard{
383+
{Keyspace: keyspaces[2], Shard: "-"},
384+
},
385+
}, {
386+
name: "single keyspace 0",
387+
clusters: []string{keyspaces[3]},
388+
expected: []*topo.KeyspaceShard{
389+
{Keyspace: keyspaces[3], Shard: "0"},
390+
},
391+
},
392+
}
393+
394+
for _, testcase := range testcases {
395+
t.Run(testcase.name, func(t *testing.T) {
396+
clustersToWatch = testcase.clusters
397+
res, err := getKeyspaceShardsToWatch()
398+
399+
assert.NoError(t, err)
400+
assert.EqualValues(t, testcase.expected, res)
401+
})
402+
}
403+
}
404+
280405
// verifyRefreshTabletsInKeyspaceShard calls refreshTabletsInKeyspaceShard with the forceRefresh parameter provided and verifies that
281406
// the number of instances refreshed matches the parameter and all the tablets match the ones provided
282407
func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet, tabletsToIgnore []string) {

0 commit comments

Comments
 (0)