Skip to content

Commit d55c5c2

Browse files
Use a new flag
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
1 parent fe2bdd8 commit d55c5c2

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

go/vt/discovery/healthcheck.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ var (
8181
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
8282
// topoReadConcurrency tells us how many topo reads are allowed in parallel
8383
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
84+
// tabletDiscoveryConcurrency tells us how many tablets can be discovered in parallel
85+
tabletDiscoveryConcurrency = flag.Int("tablet_discovery_concurrency", 1024, "concurrent tablet discoveries")
8486
)
8587

8688
// See the documentation for NewHealthCheck below for an explanation of these parameters.
@@ -324,7 +326,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
324326
} else if len(KeyspacesToWatch) > 0 {
325327
filter = NewFilterByKeyspace(KeyspacesToWatch)
326328
}
327-
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency))
329+
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency, *tabletDiscoveryConcurrency))
328330
}
329331

330332
hc.topoWatchers = topoWatchers

go/vt/discovery/topology_watcher.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ type TopologyWatcher struct {
7373
refreshKnownTablets bool
7474
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
7575
sem chan int
76+
tabletDiscoverySem chan int
7677
ctx context.Context
7778
cancelFunc context.CancelFunc
7879
// wg keeps track of all launched Go routines.
@@ -94,7 +95,7 @@ type TopologyWatcher struct {
9495

9596
// NewTopologyWatcher returns a TopologyWatcher that monitors all
9697
// the tablets in a cell, and starts refreshing.
97-
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
98+
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
9899
tw := &TopologyWatcher{
99100
topoServer: topoServer,
100101
tabletRecorder: tr,
@@ -104,6 +105,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR
104105
refreshKnownTablets: refreshKnownTablets,
105106
getTablets: getTablets,
106107
sem: make(chan int, topoReadConcurrency),
108+
tabletDiscoverySem: make(chan int, tabletDiscoveryConcurrency),
107109
tablets: make(map[string]*tabletInfo),
108110
}
109111
tw.firstLoadChan = make(chan struct{})
@@ -116,8 +118,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR
116118

117119
// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
118120
// the tablets in a cell, and starts refreshing.
119-
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
120-
return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
121+
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int) *TopologyWatcher {
122+
return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, tabletDiscoveryConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
121123
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
122124
})
123125
}
@@ -230,11 +232,11 @@ func (tw *TopologyWatcher) loadTablets() {
230232
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
231233
}
232234
} else {
233-
tw.sem <- 1 // Wait for active queue to drain.
235+
tw.tabletDiscoverySem <- 1 // Wait for active queue to drain.
234236
// This is a new tablet record, let's add it to the healthcheck
235237
tw.tabletRecorder.AddTablet(newVal.tablet)
236238
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
237-
<-tw.sem // Done; enable next request to run
239+
<-tw.tabletDiscoverySem // Done; enable next request to run
238240
}
239241
}
240242

0 commit comments

Comments
 (0)