Skip to content

Commit 33657c2

Browse files
committed
stash
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
1 parent 9af28fc commit 33657c2

File tree

3 files changed

+262
-50
lines changed

3 files changed

+262
-50
lines changed

client/metrics.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ func initAndRegisterMetrics(constLabels prometheus.Labels) {
3939
}
4040

4141
var (
42-
cmdDuration *prometheus.HistogramVec
43-
cmdFailedDuration *prometheus.HistogramVec
44-
requestDuration *prometheus.HistogramVec
45-
tsoBestBatchSize prometheus.Histogram
46-
tsoBatchSize prometheus.Histogram
47-
tsoBatchSendLatency prometheus.Histogram
48-
requestForwarded *prometheus.GaugeVec
42+
cmdDuration *prometheus.HistogramVec
43+
cmdFailedDuration *prometheus.HistogramVec
44+
requestDuration *prometheus.HistogramVec
45+
tsoBestBatchSize prometheus.Histogram
46+
tsoBatchSize prometheus.Histogram
47+
tsoBatchSendLatency prometheus.Histogram
48+
requestForwarded *prometheus.GaugeVec
49+
ongoingRequestCountGauge *prometheus.GaugeVec
4950
)
5051

5152
func initMetrics(constLabels prometheus.Labels) {
@@ -117,6 +118,15 @@ func initMetrics(constLabels prometheus.Labels) {
117118
Help: "The status to indicate if the request is forwarded",
118119
ConstLabels: constLabels,
119120
}, []string{"host", "delegate"})
121+
122+
ongoingRequestCountGauge = prometheus.NewGaugeVec(
123+
prometheus.GaugeOpts{
124+
Namespace: "pd_client",
125+
Subsystem: "request",
126+
Name: "ongoing_requests_count",
127+
Help: "Current count of ongoing batch tso requests",
128+
ConstLabels: constLabels,
129+
}, []string{"stream"})
120130
}
121131

122132
var (

client/tso_dispatcher.go

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ type tsoDispatcher struct {
8181
tsDeadlineCh chan *deadline
8282
lastTSOInfo *tsoInfo
8383

84+
batchBufferPool sync.Pool
85+
8486
updateConnectionCtxsCh chan struct{}
8587
}
8688

@@ -186,7 +188,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) {
186188
streamCtx context.Context
187189
cancel context.CancelFunc
188190
streamURL string
189-
stream *tsoStream
191+
stream *tsoStream[notifier]
190192
)
191193
// Loop through each batch of TSO requests and send them for processing.
192194
streamLoopTimer := time.NewTimer(option.timeout)
@@ -422,26 +424,44 @@ func (td *tsoDispatcher) processRequests(
422424
keyspaceID = svcDiscovery.GetKeyspaceID()
423425
reqKeyspaceGroupID = svcDiscovery.GetKeyspaceGroupID()
424426
)
425-
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
427+
428+
cb := func(result tsoRequestResult, reqKeyspaceGroupID uint32, streamURL string, err error) {
429+
curTSOInfo := &tsoInfo{
430+
tsoServer: stream.getServerURL(),
431+
reqKeyspaceGroupID: reqKeyspaceGroupID,
432+
respKeyspaceGroupID: result.respKeyspaceGroupID,
433+
respReceivedAt: time.Now(),
434+
physical: result.physical,
435+
logical: result.logical,
436+
}
437+
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
438+
firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits)
439+
td.compareAndSwapTS(curTSOInfo, firstLogical)
440+
finishCollectedRequests(tbc.getCollectedRequests(), result.physical, firstLogical, result.suffixBits, err)
441+
tbc.collectedRequestCount = 0
442+
td.batchBufferPool.Put(tbc)
443+
}
444+
445+
err := stream.processRequests(
426446
clusterID, keyspaceID, reqKeyspaceGroupID,
427-
dcLocation, count, tbc.batchStartTime)
447+
dcLocation, count, tbc.batchStartTime, cb)
428448
if err != nil {
429449
tbc.finishCollectedRequests(0, 0, 0, err)
430450
return err
431451
}
432-
curTSOInfo := &tsoInfo{
433-
tsoServer: stream.getServerURL(),
434-
reqKeyspaceGroupID: reqKeyspaceGroupID,
435-
respKeyspaceGroupID: respKeyspaceGroupID,
436-
respReceivedAt: time.Now(),
437-
physical: physical,
438-
logical: logical,
439-
}
440-
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
441-
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
442-
td.compareAndSwapTS(curTSOInfo, firstLogical)
443-
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
444-
return nil
452+
//curTSOInfo := &tsoInfo{
453+
// tsoServer: stream.getServerURL(),
454+
// reqKeyspaceGroupID: reqKeyspaceGroupID,
455+
// respKeyspaceGroupID: respKeyspaceGroupID,
456+
// respReceivedAt: time.Now(),
457+
// physical: physical,
458+
// logical: logical,
459+
//}
460+
//// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
461+
//firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
462+
//td.compareAndSwapTS(curTSOInfo, firstLogical)
463+
//tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
464+
//return nil
445465
}
446466

447467
func (td *tsoDispatcher) compareAndSwapTS(
@@ -482,3 +502,24 @@ func (td *tsoDispatcher) compareAndSwapTS(
482502
}
483503
td.lastTSOInfo = curTSOInfo
484504
}
505+
506+
type notifier struct {
507+
tbc *tsoBatchController
508+
td *tsoDispatcher
509+
}
510+
511+
func (n notifier) finish(result tsoRequestResult, err error) {
512+
curTSOInfo := &tsoInfo{
513+
tsoServer: stream.getServerURL(),
514+
reqKeyspaceGroupID: reqKeyspaceGroupID,
515+
respKeyspaceGroupID: result.respKeyspaceGroupID,
516+
respReceivedAt: time.Now(),
517+
physical: result.physical,
518+
logical: result.logical,
519+
}
520+
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
521+
firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits)
522+
n.td.compareAndSwapTS(curTSOInfo, firstLogical)
523+
finishCollectedRequests(n.tbc.getCollectedRequests(), result.physical, firstLogical, result.suffixBits, err)
524+
n.tbc.collectedRequestCount = 0
525+
}

0 commit comments

Comments
 (0)