Skip to content

Commit 81bc0d9

Browse files
Ensure all topo read calls consider --topo_read_concurrency
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
1 parent 6f406db commit 81bc0d9

File tree

16 files changed

+82
-100
lines changed

16 files changed

+82
-100
lines changed

go/flags/endtoend/vtbackup.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ Flags:
235235
--topo_global_root string the path of the global topology data in the global topology server
236236
--topo_global_server_address string the address of the global topology server
237237
--topo_implementation string the topology implementation to use
238+
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
238239
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
239240
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
240241
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ Flags:
379379
--topo_global_root string the path of the global topology data in the global topology server
380380
--topo_global_server_address string the address of the global topology server
381381
--topo_implementation string the topology implementation to use
382-
--topo_read_concurrency int Concurrency of topo reads. (default 32)
382+
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
383383
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
384384
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
385385
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)

go/flags/endtoend/vtctld.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ Flags:
169169
--topo_global_root string the path of the global topology data in the global topology server
170170
--topo_global_server_address string the address of the global topology server
171171
--topo_implementation string the topology implementation to use
172-
--topo_read_concurrency int Concurrency of topo reads. (default 32)
172+
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
173173
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
174174
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
175175
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)

go/flags/endtoend/vtgate.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ Flags:
227227
--topo_global_root string the path of the global topology data in the global topology server
228228
--topo_global_server_address string the address of the global topology server
229229
--topo_implementation string the topology implementation to use
230-
--topo_read_concurrency int Concurrency of topo reads. (default 32)
230+
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
231231
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
232232
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
233233
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)

go/flags/endtoend/vtorc.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ Flags:
107107
--topo_global_root string the path of the global topology data in the global topology server
108108
--topo_global_server_address string the address of the global topology server
109109
--topo_implementation string the topology implementation to use
110-
--topo_read_concurrency int Concurrency of topo reads. (default 32)
110+
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
111111
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
112112
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
113113
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)

go/flags/endtoend/vttablet.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ Flags:
381381
--topo_global_root string the path of the global topology data in the global topology server
382382
--topo_global_server_address string the address of the global topology server
383383
--topo_implementation string the topology implementation to use
384+
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
384385
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
385386
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
386387
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)

go/vt/discovery/healthcheck.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
382382
if c == "" {
383383
continue
384384
}
385-
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
385+
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultReadConcurrency))
386386
}
387387

388388
hc.topoWatchers = topoWatchers

go/vt/discovery/topology_watcher.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,13 @@ import (
2626
"sync"
2727
"time"
2828

29-
"vitess.io/vitess/go/vt/topo/topoproto"
30-
31-
"vitess.io/vitess/go/vt/key"
32-
3329
"vitess.io/vitess/go/stats"
3430
"vitess.io/vitess/go/trace"
35-
31+
"vitess.io/vitess/go/vt/key"
3632
"vitess.io/vitess/go/vt/log"
37-
"vitess.io/vitess/go/vt/proto/topodata"
33+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3834
"vitess.io/vitess/go/vt/topo"
35+
"vitess.io/vitess/go/vt/topo/topoproto"
3936
)
4037

4138
const (
@@ -56,7 +53,7 @@ var (
5653
// tabletInfo is used internally by the TopologyWatcher struct.
5754
type tabletInfo struct {
5855
alias string
59-
tablet *topodata.Tablet
56+
tablet *topodatapb.Tablet
6057
}
6158

6259
// TopologyWatcher polls the topology periodically for changes to
@@ -70,7 +67,7 @@ type TopologyWatcher struct {
7067
cell string
7168
refreshInterval time.Duration
7269
refreshKnownTablets bool
73-
concurrency int
70+
concurrency int64
7471
ctx context.Context
7572
cancelFunc context.CancelFunc
7673
// wg keeps track of all launched Go routines.
@@ -92,7 +89,7 @@ type TopologyWatcher struct {
9289

9390
// NewTopologyWatcher returns a TopologyWatcher that monitors all
9491
// the tablets in a cell, and reloads them as needed.
95-
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
92+
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher {
9693
tw := &TopologyWatcher{
9794
topoServer: topoServer,
9895
healthcheck: hc,
@@ -112,7 +109,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
112109
}
113110

114111
func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
115-
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
112+
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
116113
}
117114

118115
// Start starts the topology watcher.
@@ -271,14 +268,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
271268
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
272269
type TabletFilter interface {
273270
// IsIncluded returns whether tablet is included in this filter
274-
IsIncluded(tablet *topodata.Tablet) bool
271+
IsIncluded(tablet *topodatapb.Tablet) bool
275272
}
276273

277274
// TabletFilters contains filters for tablets.
278275
type TabletFilters []TabletFilter
279276

280277
// IsIncluded returns true if a tablet passes all filters.
281-
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
278+
func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool {
282279
for _, filter := range tf {
283280
if !filter.IsIncluded(tablet) {
284281
return false
@@ -299,7 +296,7 @@ type FilterByShard struct {
299296
type filterShard struct {
300297
keyspace string
301298
shard string
302-
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
299+
keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange
303300
}
304301

305302
// NewFilterByShard creates a new FilterByShard for use by a
@@ -344,7 +341,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
344341
}
345342

346343
// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
347-
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
344+
func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool {
348345
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
349346
if err != nil {
350347
log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err)
@@ -384,7 +381,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
384381
}
385382

386383
// IsIncluded returns true if the tablet's keyspace matches what we have.
387-
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
384+
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool {
388385
_, exist := fbk.keyspaces[tablet.Keyspace]
389386
return exist
390387
}
@@ -403,7 +400,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
403400
}
404401

405402
// IsIncluded returns true if the tablet's tags match what we expect.
406-
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
403+
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool {
407404
if fbtg.tags == nil {
408405
return true
409406
}

go/vt/topo/keyspace.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,44 +22,26 @@ import (
2222
"sort"
2323
"sync"
2424

25-
"github.com/spf13/pflag"
2625
"golang.org/x/sync/errgroup"
2726

2827
"vitess.io/vitess/go/constants/sidecar"
28+
"vitess.io/vitess/go/event"
2929
"vitess.io/vitess/go/sqlescape"
3030
"vitess.io/vitess/go/vt/key"
31-
"vitess.io/vitess/go/vt/servenv"
32-
"vitess.io/vitess/go/vt/vterrors"
33-
34-
"vitess.io/vitess/go/event"
3531
"vitess.io/vitess/go/vt/log"
36-
"vitess.io/vitess/go/vt/topo/events"
37-
3832
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3933
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
34+
"vitess.io/vitess/go/vt/topo/events"
35+
"vitess.io/vitess/go/vt/vterrors"
4036
)
4137

4238
// This file contains keyspace utility functions.
4339

44-
// Default concurrency to use in order to avoid overhwelming the topo server.
45-
var DefaultConcurrency = 32
46-
4740
// shardKeySuffix is the suffix of a shard key.
4841
// The full key looks like this:
4942
// /vitess/global/keyspaces/customer/shards/80-/Shard
5043
const shardKeySuffix = "Shard"
5144

52-
func registerFlags(fs *pflag.FlagSet) {
53-
fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.")
54-
}
55-
56-
func init() {
57-
servenv.OnParseFor("vtcombo", registerFlags)
58-
servenv.OnParseFor("vtctld", registerFlags)
59-
servenv.OnParseFor("vtgate", registerFlags)
60-
servenv.OnParseFor("vtorc", registerFlags)
61-
}
62-
6345
// KeyspaceInfo is a meta struct that contains metadata to give the
6446
// data more context and convenience. This is the main way we interact
6547
// with a keyspace.
@@ -198,7 +180,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
198180
type FindAllShardsInKeyspaceOptions struct {
199181
// Concurrency controls the maximum number of concurrent calls to GetShard.
200182
// If <= 0, Concurrency is set to 1.
201-
Concurrency int
183+
Concurrency int64
202184
}
203185

204186
// FindAllShardsInKeyspace reads and returns all the existing shards in a
@@ -212,7 +194,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
212194
opt = &FindAllShardsInKeyspaceOptions{}
213195
}
214196
if opt.Concurrency <= 0 {
215-
opt.Concurrency = DefaultConcurrency
197+
opt.Concurrency = DefaultReadConcurrency
216198
}
217199

218200
// Unescape the keyspace name as this can e.g. come from the VSchema where

go/vt/topo/server.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"sync"
4949

5050
"github.com/spf13/pflag"
51+
"golang.org/x/sync/semaphore"
5152

5253
"vitess.io/vitess/go/vt/log"
5354
"vitess.io/vitess/go/vt/proto/topodata"
@@ -141,6 +142,10 @@ type Server struct {
141142
// will read the list of addresses for that cell from the
142143
// global cluster and create clients as needed.
143144
cellConns map[string]cellConn
145+
146+
// cellReadSem is a semaphore limiting the number of concurrent read
147+
// operations to all cell-based topos.
148+
cellReadSem *semaphore.Weighted
144149
}
145150

146151
type cellConn struct {
@@ -175,6 +180,9 @@ var (
175180

176181
FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate",
177182
"vtorc", "vtbackup"}
183+
184+
// Default read concurrency to use in order to avoid overhwelming the topo server.
185+
DefaultReadConcurrency int64 = 32
178186
)
179187

180188
func init() {
@@ -187,6 +195,7 @@ func registerTopoFlags(fs *pflag.FlagSet) {
187195
fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use")
188196
fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server")
189197
fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server")
198+
fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads.")
190199
}
191200

192201
// RegisterFactory registers a Factory for an implementation for a Server.
@@ -202,19 +211,20 @@ func RegisterFactory(name string, factory Factory) {
202211
// NewWithFactory creates a new Server based on the given Factory.
203212
// It also opens the global cell connection.
204213
func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error) {
214+
globalReadSem := semaphore.NewWeighted(DefaultReadConcurrency)
205215
conn, err := factory.Create(GlobalCell, serverAddress, root)
206216
if err != nil {
207217
return nil, err
208218
}
209-
conn = NewStatsConn(GlobalCell, conn)
219+
conn = NewStatsConn(GlobalCell, conn, globalReadSem)
210220

211221
var connReadOnly Conn
212222
if factory.HasGlobalReadOnlyCell(serverAddress, root) {
213223
connReadOnly, err = factory.Create(GlobalReadOnlyCell, serverAddress, root)
214224
if err != nil {
215225
return nil, err
216226
}
217-
connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly)
227+
connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly, globalReadSem)
218228
} else {
219229
connReadOnly = conn
220230
}
@@ -224,6 +234,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error
224234
globalReadOnlyCell: connReadOnly,
225235
factory: factory,
226236
cellConns: make(map[string]cellConn),
237+
cellReadSem: semaphore.NewWeighted(DefaultReadConcurrency),
227238
}, nil
228239
}
229240

@@ -295,7 +306,7 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) {
295306
conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root)
296307
switch {
297308
case err == nil:
298-
conn = NewStatsConn(cell, conn)
309+
conn = NewStatsConn(cell, conn, ts.cellReadSem)
299310
ts.cellConns[cell] = cellConn{ci, conn}
300311
return conn, nil
301312
case IsErrType(err, NoNode):

go/vt/topo/shard.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -671,16 +671,8 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
671671
}
672672
}
673673

674-
// Divide the concurrency limit by the number of cells. If there are more
675-
// cells than the limit, default to concurrency of 1.
676-
cellConcurrency := 1
677-
if len(cells) < DefaultConcurrency {
678-
cellConcurrency = DefaultConcurrency / len(cells)
679-
}
680-
681674
mu := sync.Mutex{}
682675
eg, ctx := errgroup.WithContext(ctx)
683-
eg.SetLimit(DefaultConcurrency)
684676

685677
tablets := make([]*TabletInfo, 0, len(cells))
686678
var kss *KeyspaceShard
@@ -691,7 +683,6 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
691683
}
692684
}
693685
options := &GetTabletsByCellOptions{
694-
Concurrency: cellConcurrency,
695686
KeyspaceShard: kss,
696687
}
697688
for _, cell := range cells {

0 commit comments

Comments
 (0)