diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index c45092b811a..e36c35924b1 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -107,6 +107,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index dced769ca78..08b1c59f4cc 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -57,6 +57,7 @@ func init() { servenv.OnParseFor("vtcombo", registerFlags) servenv.OnParseFor("vtctld", registerFlags) servenv.OnParseFor("vtgate", registerFlags) + servenv.OnParseFor("vtorc", registerFlags) } // KeyspaceInfo is a meta struct that contains metadata to give the diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 7343db2c0ab..220510e71f5 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -27,20 +27,20 @@ import ( "sync" "time" - "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" + "golang.org/x/sync/errgroup" + "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/event" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/events" "vitess.io/vitess/go/vt/topo/topoproto" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vterrors" ) const ( @@ -581,7 +581,6 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac span.Annotate("shard", shard) span.Annotate("num_cells", len(cells)) defer span.Finish() - ctx = trace.NewContext(ctx, span) var err error // The caller intents to all cells @@ -625,7 +624,7 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac case IsErrType(err, NoNode): // There is no shard replication for this shard in this cell. NOOP default: - rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetShardReplication(%v, %v, %v) failed.", cell, keyspace, shard))) + rec.RecordError(vterrors.Wrapf(err, "GetShardReplication(%v, %v, %v) failed.", cell, keyspace, shard)) return } }(cell) @@ -644,6 +643,76 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac return result, err } +// GetTabletsByShard returns the tablets in the given shard using all cells. +// It can return ErrPartialResult if it couldn't read all the cells, or all +// the individual tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) { + return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil) +} + +// GetTabletsByShardCell returns the tablets in the given shard. It can return +// ErrPartialResult if it couldn't read all the cells, or all the individual +// tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) { + span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell") + span.Annotate("keyspace", keyspace) + span.Annotate("shard", shard) + span.Annotate("num_cells", len(cells)) + defer span.Finish() + var err error + + if len(cells) == 0 { + cells, err = ts.GetCellInfoNames(ctx) + if err != nil { + return nil, err + } + if len(cells) == 0 { // Nothing to do + return nil, nil + } + } + + // Divide the concurrency limit by the number of cells. If there are more + // cells than the limit, default to concurrency of 1. + cellConcurrency := 1 + if len(cells) < DefaultConcurrency { + cellConcurrency = DefaultConcurrency / len(cells) + } + + mu := sync.Mutex{} + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(DefaultConcurrency) + + tablets := make([]*TabletInfo, 0, len(cells)) + var kss *KeyspaceShard + if keyspace != "" { + kss = &KeyspaceShard{ + Keyspace: keyspace, + Shard: shard, + } + } + options := &GetTabletsByCellOptions{ + Concurrency: cellConcurrency, + KeyspaceShard: kss, + } + for _, cell := range cells { + eg.Go(func() error { + t, err := ts.GetTabletsByCell(ctx, cell, options) + if err != nil { + return vterrors.Wrapf(err, "GetTabletsByCell for %v failed.", cell) + } + mu.Lock() + defer mu.Unlock() + tablets = append(tablets, t...) + return nil + }) + } + if err := eg.Wait(); err != nil { + log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err) + return tablets, NewError(PartialResult, shard) + } + return tablets, nil +} + // GetTabletMapForShard returns the tablets for a shard. It can return // ErrPartialResult if it couldn't read all the cells, or all // the individual tablets, in which case the map is valid, but partial. diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index a9e4c5962fa..7dccf3bf183 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -289,6 +289,9 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t type GetTabletsByCellOptions struct { // Concurrency controls the maximum number of concurrent calls to GetTablet. Concurrency int + // KeyspaceShard is the optional keyspace/shard that tablets must match. + // An empty shard value will match all shards in the keyspace. + KeyspaceShard *KeyspaceShard } // GetTabletsByCell returns all the tablets in the cell. @@ -316,15 +319,27 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G return nil, err } - tablets := make([]*TabletInfo, len(listResults)) + var capHint int + if opt != nil && opt.KeyspaceShard == nil { + capHint = len(listResults) + } + + tablets := make([]*TabletInfo, 0, capHint) for n := range listResults { tablet := &topodatapb.Tablet{} if err := tablet.UnmarshalVT(listResults[n].Value); err != nil { return nil, err } - tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version} + if opt != nil && opt.KeyspaceShard != nil && opt.KeyspaceShard.Keyspace != "" { + if opt.KeyspaceShard.Keyspace != tablet.Keyspace { + continue + } + if opt.KeyspaceShard.Shard != "" && opt.KeyspaceShard.Shard != tablet.Shard { + continue + } + } + tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version}) } - return tablets, nil } diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index 3a0153a11b5..e659a0d01b9 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -17,8 +17,10 @@ limitations under the License. package topo_test import ( + "cmp" "context" "errors" + "slices" "testing" "github.com/stretchr/testify/assert" @@ -34,43 +36,296 @@ import ( // GetTabletsByCell first tries to get all the tablets using List. // If the response is too large, we will get an error, and fall back to one tablet at a time. func TestServerGetTabletsByCell(t *testing.T) { + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + tests := []struct { - name string - tablets int - opt *topo.GetTabletsByCellOptions - listError error + name string + createShardTablets int + expectedTablets []*topodatapb.Tablet + opt *topo.GetTabletsByCellOptions + listError error + keyspaceShards []*topo.KeyspaceShard }{ { - name: "negative concurrency", - tablets: 1, + name: "negative concurrency", + keyspaceShards: []*topo.KeyspaceShard{ + {Keyspace: keyspace, Shard: shard}, + }, + createShardTablets: 1, + expectedTablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(1), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(1), + }, + Keyspace: keyspace, + Shard: shard, + }, + }, // Ensure this doesn't panic. opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, }, { - name: "single", - tablets: 1, + name: "single", + keyspaceShards: []*topo.KeyspaceShard{ + {Keyspace: keyspace, Shard: shard}, + }, + createShardTablets: 1, + expectedTablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(1), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(1), + }, + Keyspace: keyspace, + Shard: shard, + }, + }, // Make sure the defaults apply as expected. opt: nil, }, { name: "multiple", - // should work with more than 1 tablet - tablets: 32, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + keyspaceShards: []*topo.KeyspaceShard{ + {Keyspace: keyspace, Shard: shard}, + }, + // Should work with more than 1 tablet + createShardTablets: 4, + expectedTablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(1), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(1), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(2), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(2), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(3), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(3), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(4), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(4), + }, + Keyspace: keyspace, + Shard: shard, + }, + }, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, }, { name: "multiple with list error", - // should work with more than 1 tablet when List returns an error - tablets: 32, + keyspaceShards: []*topo.KeyspaceShard{ + {Keyspace: keyspace, Shard: shard}, + }, + // Should work with more than 1 tablet when List returns an error + createShardTablets: 4, + expectedTablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(1), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(1), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(2), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(2), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(3), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(3), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(4), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(4), + }, + Keyspace: keyspace, + Shard: shard, + }, + }, opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, listError: topo.NewError(topo.ResourceExhausted, ""), }, + { + name: "filtered by keyspace and shard", + keyspaceShards: []*topo.KeyspaceShard{ + {Keyspace: keyspace, Shard: shard}, + {Keyspace: "filtered", Shard: "-"}, + }, + // Should create 2 tablets in 2 different shards (4 total) + // but only a single shard is returned + createShardTablets: 2, + expectedTablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(1), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(1), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(2), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(2), + }, + Keyspace: keyspace, + Shard: shard, + }, + }, + opt: &topo.GetTabletsByCellOptions{ + Concurrency: 1, + KeyspaceShard: &topo.KeyspaceShard{ + Keyspace: keyspace, + Shard: shard, + }, + }, + }, + { + name: "filtered by keyspace and no shard", + keyspaceShards: []*topo.KeyspaceShard{ + {Keyspace: keyspace, Shard: shard}, + {Keyspace: keyspace, Shard: shard + "2"}, + }, + // Should create 2 tablets in 2 different shards (4 total) + // in the same keyspace and both shards are returned due to + // empty shard + createShardTablets: 2, + expectedTablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(1), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(1), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(2), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(2), + }, + Keyspace: keyspace, + Shard: shard, + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(3), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(3), + }, + Keyspace: keyspace, + Shard: shard + "2", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(4), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(4), + }, + Keyspace: keyspace, + Shard: shard + "2", + }, + }, + opt: &topo.GetTabletsByCellOptions{ + Concurrency: 1, + KeyspaceShard: &topo.KeyspaceShard{ + Keyspace: keyspace, + Shard: "", + }, + }, + }, } - const cell = "zone1" - const keyspace = "keyspace" - const shard = "shard" - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -84,37 +339,53 @@ func TestServerGetTabletsByCell(t *testing.T) { // Create an ephemeral keyspace and generate shard records within // the keyspace to fetch later. - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablets := make([]*topo.TabletInfo, tt.tablets) + createdKeyspaces := make(map[string]bool, len(tt.keyspaceShards)) + for _, kss := range tt.keyspaceShards { + if !createdKeyspaces[kss.Keyspace] { + require.NoError(t, ts.CreateKeyspace(ctx, kss.Keyspace, &topodatapb.Keyspace{})) + createdKeyspaces[kss.Keyspace] = true + } + require.NoError(t, ts.CreateShard(ctx, kss.Keyspace, kss.Shard)) + } - for i := 0; i < tt.tablets; i++ { - tablet := &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: cell, - Uid: uint32(i), - }, - Hostname: "host1", - PortMap: map[string]int32{ - "vt": int32(i), - }, - Keyspace: keyspace, - Shard: shard, + var uid uint32 = 1 + for _, kss := range tt.keyspaceShards { + for i := 0; i < tt.createShardTablets; i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uid, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(uid), + }, + Keyspace: kss.Keyspace, + Shard: kss.Shard, + } + require.NoError(t, ts.CreateTablet(ctx, tablet)) + uid++ } - tInfo := &topo.TabletInfo{Tablet: tablet} - tablets[i] = tInfo - require.NoError(t, ts.CreateTablet(ctx, tablet)) } // Verify that we return a complete list of tablets and that each // tablet matches what we expect. out, err := ts.GetTabletsByCell(ctx, cell, tt.opt) require.NoError(t, err) - require.Len(t, out, tt.tablets) + require.Len(t, out, len(tt.expectedTablets)) + + slices.SortFunc(out, func(i, j *topo.TabletInfo) int { + return cmp.Compare(i.Alias.Uid, j.Alias.Uid) + }) + slices.SortFunc(tt.expectedTablets, func(i, j *topodatapb.Tablet) int { + return cmp.Compare(i.Alias.Uid, j.Alias.Uid) + }) - for i, tab := range tablets { - require.Equal(t, tab.Tablet, tablets[i].Tablet) + for i, tablet := range out { + expected := tt.expectedTablets[i] + require.Equal(t, expected.Alias.String(), tablet.Alias.String()) + require.Equal(t, expected.Keyspace, tablet.Keyspace) + require.Equal(t, expected.Shard, tablet.Shard) } }) } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index cd112481edc..73cd61676ca 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -37,7 +37,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/reparentutil" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" @@ -203,7 +202,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell) + tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) return @@ -235,7 +234,7 @@ func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) { } func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { - tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard) + tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard) if err != nil { log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err) return @@ -245,7 +244,7 @@ func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore) } -func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { +func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { // Discover new tablets. latestInstances := make(map[string]bool) var wg sync.WaitGroup