Skip to content

Commit

Permalink
Make the concurrency factor a hard limit
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Jul 4, 2024
1 parent d25a8c8 commit aaf6954
Showing 1 changed file with 64 additions and 15 deletions.
79 changes: 64 additions & 15 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type tsoDispatcher struct {
pendingBatches sync.Map

updateConnectionCtxsCh chan struct{}
tsoReqTokenCh chan struct{}

dispatcherID string
}
Expand Down Expand Up @@ -123,6 +124,7 @@ func newTSODispatcher(
}},

updateConnectionCtxsCh: make(chan struct{}, 1),
tsoReqTokenCh: make(chan struct{}, 256),

dispatcherID: fmt.Sprintf("%d", dispatcherIDAlloc.Add(1)),
}
Expand Down Expand Up @@ -215,7 +217,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) {
stream *tsoStream
)

concurrencyFactor := option.getTSOClientConcurrencyFactor()
concurrencyFactor := 0
// Avoid loading from the dynamic options map too frequently.
lastUpdateConcurrencyFactorTime := time.Now()
const updateConcurrencyFactorInterval = time.Second * 5
Expand Down Expand Up @@ -257,22 +259,61 @@ tsoBatchLoop:
//}

currentBatchStartTime := time.Now()
if currentBatchStartTime.Sub(lastUpdateConcurrencyFactorTime) > updateConcurrencyFactorInterval {
if concurrencyFactor == 0 || currentBatchStartTime.Sub(lastUpdateConcurrencyFactorTime) > updateConcurrencyFactorInterval {
lastUpdateConcurrencyFactorTime = currentBatchStartTime
concurrencyFactor = option.getTSOClientConcurrencyFactor()
newConcurrencyFactor := option.getTSOClientConcurrencyFactor()
// Adjust available tokens count
if newConcurrencyFactor > concurrencyFactor {
for concurrencyFactor < newConcurrencyFactor {
td.tsoReqTokenCh <- struct{}{}
concurrencyFactor++
}
} else if newConcurrencyFactor < concurrencyFactor {
for concurrencyFactor > newConcurrencyFactor {
select {
case <-ctx.Done():
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
return
case <-td.tsoReqTokenCh:
}
concurrencyFactor--
}
}
}

batchController = td.batchBufferPool.Get().(*tsoBatchController)
batchController.collectedRequestCount = 0
// Receive the first request
select {
case <-ctx.Done():
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
return
case firstRequest := <-td.reqChan:
batchController.batchStartTime = time.Now()
batchController.pushRequest(firstRequest)
// Receive until the first request arrives AND a token is ready.
for {
select {
case <-ctx.Done():
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
return
case firstRequest := <-td.reqChan:
batchController.batchStartTime = time.Now()
batchController.pushRequest(firstRequest)
// Token is not ready. Continue the loop to wait for the token or another request.
continue
case <-td.tsoReqTokenCh:
}

// A token is ready. If the first request didn't arrive, wait for it.
if batchController.collectedRequestCount == 0 {
select {
case <-ctx.Done():
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
return
case firstRequest := <-td.reqChan:
batchController.batchStartTime = time.Now()
batchController.pushRequest(firstRequest)
// Token is not ready. Continue the loop to wait for the token or another request.
}
}

break
}

//if maxBatchWaitInterval >= 0 {
Expand Down Expand Up @@ -305,15 +346,15 @@ tsoBatchLoop:
select {
case <-ctx.Done():
// Finish the collected requests if the context is canceled.
batchController.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err()))
td.cancelCollectedRequests(batchController, errors.WithStack(err))
timer.Stop()
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
svcDiscovery.ScheduleCheckMemberChanged()
// Finish the collected requests if the stream is failed to be created.
batchController.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
td.cancelCollectedRequests(batchController, errors.WithStack(err))
timer.Stop()
continue tsoBatchLoop
case <-timer.C:
Expand Down Expand Up @@ -535,13 +576,19 @@ func (td *tsoDispatcher) processRequests(
dcLocation, count, tbc.batchStartTime)
if err != nil {
td.pendingBatches.Delete(batchedReqID)
tbc.finishCollectedRequests(0, 0, 0, err)
td.cancelCollectedRequests(tbc, err)
return err
}

return nil
}

func (td *tsoDispatcher) cancelCollectedRequests(tbc *tsoBatchController, err error) {
tbc.finishCollectedRequests(0, 0, 0, err)
// Release a token.
td.tsoReqTokenCh <- struct{}{}
}

func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestResult, err error, statFunc func(latency time.Duration)) {
tbc, loaded := td.pendingBatches.LoadAndDelete(reqID)
if !loaded {
Expand All @@ -563,6 +610,8 @@ func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestRe
finishCollectedRequests(typedTbc.getCollectedRequests(), result.physical, firstLogical, result.suffixBits, err, statFunc)
typedTbc.collectedRequestCount = 0
td.batchBufferPool.Put(typedTbc)
// Release a token.
td.tsoReqTokenCh <- struct{}{}
}

func (td *tsoDispatcher) revokePendingRequests(err error) {
Expand Down

0 comments on commit aaf6954

Please sign in to comment.