Skip to content

Commit

Permalink
make semaphore conditional
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Oct 25, 2024
1 parent d0a3817 commit a8c8551
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,13 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
// divide the concurrency limit by the number of cells. if there are more
// cells than the limit, default to concurrency of 1. A semaphore ensures
// the limit is not exceeded in this scenario.
sem := semaphore.NewWeighted(int64(DefaultConcurrency))
getConcurrency := DefaultConcurrency / len(cells)
if getConcurrency == 0 {
var getConcurrency int
var sem *semaphore.Weighted
if len(cells) > DefaultConcurrency {
sem = semaphore.NewWeighted(int64(DefaultConcurrency))
getConcurrency = 1
} else {
getConcurrency = DefaultConcurrency / len(cells)
}

wg := sync.WaitGroup{}
Expand All @@ -686,11 +689,13 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
for _, cell := range cells {
wg.Add(1)
go func() {
if err := sem.Acquire(ctx, 1); err != nil {
rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)))
return
if sem != nil {
if err := sem.Acquire(ctx, 1); err != nil {
rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)))
return
}
defer sem.Release(1)
}
defer sem.Release(1)

t, err := ts.GetTabletsByCell(ctx, cell, &GetTabletsByCellOptions{
Concurrency: getConcurrency,
Expand Down

0 comments on commit a8c8551

Please sign in to comment.