Skip to content

Commit

Permalink
Fix latency measurement
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Jul 2, 2024
1 parent 838da9a commit 770c19b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
7 changes: 5 additions & 2 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,22 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}

func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
finishCollectedRequests(tbc.collectedRequests[:tbc.collectedRequestCount], physical, firstLogical, suffixBits, err)
finishCollectedRequests(tbc.collectedRequests[:tbc.collectedRequestCount], physical, firstLogical, suffixBits, err, nil)
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0

}

func finishCollectedRequests(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
func finishCollectedRequests(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error, statFunc func(latency time.Duration)) {
for i := 0; i < len(requests); i++ {
tsoReq := requests[i]
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
tsoReq.tryDone(err)
if statFunc != nil {
statFunc(time.Since(tsoReq.start))
}
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func (td *tsoDispatcher) processRequests(
return nil
}

func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestResult, err error) {
func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestResult, err error, statFunc func(latency time.Duration)) {
tbc, loaded := td.pendingBatches.LoadAndDelete(reqID)
if !loaded {
log.Info("received response for already abandoned requests")
Expand All @@ -551,7 +551,7 @@ func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestRe
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits)
//td.compareAndSwapTS(curTSOInfo, firstLogical)
finishCollectedRequests(typedTbc.getCollectedRequests(), result.physical, firstLogical, result.suffixBits, err)
finishCollectedRequests(typedTbc.getCollectedRequests(), result.physical, firstLogical, result.suffixBits, err, statFunc)
typedTbc.collectedRequestCount = 0
td.batchBufferPool.Put(typedTbc)
}
Expand Down
13 changes: 9 additions & 4 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ type batchedReq struct {
startTime time.Time
}

type tsoStreamOnRecvCallback = func(reqID uint64, res tsoRequestResult, err error)
type tsoStreamOnRecvCallback = func(reqID uint64, res tsoRequestResult, err error, statFunc func(latency time.Duration))

var streamIDAlloc atomic.Int32

Expand Down Expand Up @@ -353,6 +353,8 @@ func (s *tsoStream) recvLoop(ctx context.Context) {
s.estimateLatencyMicros.Store(uint64(math.Exp(logEstimatedLatency)))
}

statFunc := s.observeLatency

recvLoop:
for {
select {
Expand Down Expand Up @@ -397,9 +399,8 @@ recvLoop:

// TODO: Check request and result have matching count.

s.onRecvCallback(req.reqID, res, nil)
s.onRecvCallback(req.reqID, res, nil, statFunc)
s.onTheFlyRequestCountGauge.Set(float64(s.onTheFlyRequests.Add(-1)))
s.latencyHistogram.observe(latency.Seconds())
if now.Sub(s.lastDumpHistogramTime) >= time.Minute && !s.latencyHistogramDumpWorking.Load() {
s.latencyHistogramDumpWorking.Store(true)
s.latencyHistogram, s.latencyHistogramDumping = s.latencyHistogramDumping, s.latencyHistogram
Expand All @@ -420,10 +421,14 @@ recvLoop:
if !ok {
break
}
s.onRecvCallback(req.reqID, tsoRequestResult{}, finishWithErr)
s.onRecvCallback(req.reqID, tsoRequestResult{}, finishWithErr, nil)
}
}

func (s *tsoStream) observeLatency(latency time.Duration) {
s.latencyHistogram.observe(latency.Seconds())
}

func (s *tsoStream) dumpLatencyHistogram(now time.Time) {
defer s.latencyHistogramDumpWorking.Store(false)

Expand Down

0 comments on commit 770c19b

Please sign in to comment.