From 770c19b71b22cdcd1c5117e6d3919e8f5788e8df Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 2 Jul 2024 16:44:37 +0800 Subject: [PATCH] Fix latency measurement Signed-off-by: MyonKeminta --- client/tso_batch_controller.go | 7 +++++-- client/tso_dispatcher.go | 4 ++-- client/tso_stream.go | 13 +++++++++---- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index f446d98fc43a..718ba905f9c6 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -136,19 +136,22 @@ func (tbc *tsoBatchController) adjustBestBatchSize() { } func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) { - finishCollectedRequests(tbc.collectedRequests[:tbc.collectedRequestCount], physical, firstLogical, suffixBits, err) + finishCollectedRequests(tbc.collectedRequests[:tbc.collectedRequestCount], physical, firstLogical, suffixBits, err, nil) // Prevent the finished requests from being processed again. tbc.collectedRequestCount = 0 } -func finishCollectedRequests(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) { +func finishCollectedRequests(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error, statFunc func(latency time.Duration)) { for i := 0; i < len(requests); i++ { tsoReq := requests[i] // Retrieve the request context before the request is done to trace without race. requestCtx := tsoReq.requestCtx tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) tsoReq.tryDone(err) + if statFunc != nil { + statFunc(time.Since(tsoReq.start)) + } trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End() } } diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 4eca0d87d35c..48ddaefb7b02 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -533,7 +533,7 @@ func (td *tsoDispatcher) processRequests( return nil } -func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestResult, err error) { +func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestResult, err error, statFunc func(latency time.Duration)) { tbc, loaded := td.pendingBatches.LoadAndDelete(reqID) if !loaded { log.Info("received response for already abandoned requests") @@ -551,7 +551,7 @@ func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestRe // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits) //td.compareAndSwapTS(curTSOInfo, firstLogical) - finishCollectedRequests(typedTbc.getCollectedRequests(), result.physical, firstLogical, result.suffixBits, err) + finishCollectedRequests(typedTbc.getCollectedRequests(), result.physical, firstLogical, result.suffixBits, err, statFunc) typedTbc.collectedRequestCount = 0 td.batchBufferPool.Put(typedTbc) } diff --git a/client/tso_stream.go b/client/tso_stream.go index a8f5ebf4c697..0914162df1f1 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -217,7 +217,7 @@ type batchedReq struct { startTime time.Time } -type tsoStreamOnRecvCallback = func(reqID uint64, res tsoRequestResult, err error) +type tsoStreamOnRecvCallback = func(reqID uint64, res tsoRequestResult, err error, statFunc func(latency time.Duration)) var streamIDAlloc atomic.Int32 @@ -353,6 +353,8 @@ func (s *tsoStream) recvLoop(ctx context.Context) { s.estimateLatencyMicros.Store(uint64(math.Exp(logEstimatedLatency))) } + statFunc := s.observeLatency + recvLoop: for { select { @@ -397,9 +399,8 @@ recvLoop: // TODO: Check request and result have matching count. - s.onRecvCallback(req.reqID, res, nil) + s.onRecvCallback(req.reqID, res, nil, statFunc) s.onTheFlyRequestCountGauge.Set(float64(s.onTheFlyRequests.Add(-1))) - s.latencyHistogram.observe(latency.Seconds()) if now.Sub(s.lastDumpHistogramTime) >= time.Minute && !s.latencyHistogramDumpWorking.Load() { s.latencyHistogramDumpWorking.Store(true) s.latencyHistogram, s.latencyHistogramDumping = s.latencyHistogramDumping, s.latencyHistogram @@ -420,10 +421,14 @@ recvLoop: if !ok { break } - s.onRecvCallback(req.reqID, tsoRequestResult{}, finishWithErr) + s.onRecvCallback(req.reqID, tsoRequestResult{}, finishWithErr, nil) } } +func (s *tsoStream) observeLatency(latency time.Duration) { + s.latencyHistogram.observe(latency.Seconds()) +} + func (s *tsoStream) dumpLatencyHistogram(now time.Time) { defer s.latencyHistogramDumpWorking.Store(false)