Skip to content

Commit d0a3817

Browse files
support cells > concurrency limit
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
1 parent e5bf0df commit d0a3817

File tree

1 file changed

+22
-8
lines changed

1 file changed

+22
-8
lines changed

go/vt/topo/shard.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,20 @@ import (
2727
"sync"
2828
"time"
2929

30-
"vitess.io/vitess/go/constants/sidecar"
31-
"vitess.io/vitess/go/protoutil"
32-
"vitess.io/vitess/go/vt/proto/vtrpc"
33-
"vitess.io/vitess/go/vt/vterrors"
30+
"golang.org/x/sync/semaphore"
3431

32+
"vitess.io/vitess/go/constants/sidecar"
3533
"vitess.io/vitess/go/event"
34+
"vitess.io/vitess/go/protoutil"
3635
"vitess.io/vitess/go/trace"
3736
"vitess.io/vitess/go/vt/concurrency"
3837
"vitess.io/vitess/go/vt/key"
3938
"vitess.io/vitess/go/vt/log"
39+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
40+
"vitess.io/vitess/go/vt/proto/vtrpc"
4041
"vitess.io/vitess/go/vt/topo/events"
4142
"vitess.io/vitess/go/vt/topo/topoproto"
42-
43-
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
43+
"vitess.io/vitess/go/vt/vterrors"
4444
)
4545

4646
const (
@@ -670,16 +670,30 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
670670
}
671671
}
672672

673+
// divide the concurrency limit by the number of cells. if there are more
674+
// cells than the limit, default to concurrency of 1. A semaphore ensures
675+
// the limit is not exceeded in this scenario.
676+
sem := semaphore.NewWeighted(int64(DefaultConcurrency))
677+
getConcurrency := DefaultConcurrency / len(cells)
678+
if getConcurrency == 0 {
679+
getConcurrency = 1
680+
}
681+
673682
wg := sync.WaitGroup{}
674683
mutex := sync.Mutex{}
675684
rec := concurrency.AllErrorRecorder{}
676685
tablets := make([]*TabletInfo, 0)
677-
concurrency := DefaultConcurrency / len(cells)
678686
for _, cell := range cells {
679687
wg.Add(1)
680688
go func() {
689+
if err := sem.Acquire(ctx, 1); err != nil {
690+
rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)))
691+
return
692+
}
693+
defer sem.Release(1)
694+
681695
t, err := ts.GetTabletsByCell(ctx, cell, &GetTabletsByCellOptions{
682-
Concurrency: concurrency,
696+
Concurrency: getConcurrency,
683697
Keyspace: keyspace,
684698
Shard: shard,
685699
})

0 commit comments

Comments
 (0)