Skip to content

Commit

Permalink
Add QueryRegion related metrics
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Feb 14, 2025
1 parent b7977dd commit 6b9ccfc
Show file tree
Hide file tree
Showing 9 changed files with 708 additions and 23 deletions.
22 changes: 20 additions & 2 deletions client/clients/router/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/log"

"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/batch"
cctx "github.com/tikv/pd/client/pkg/connectionctx"
Expand Down Expand Up @@ -209,8 +210,12 @@ func NewClient(
}
},
},
requestCh: make(chan *Request, defaultMaxRouterRequestBatchSize*2),
batchController: batch.NewController(defaultMaxRouterRequestBatchSize, requestFinisher(nil), nil),
requestCh: make(chan *Request, defaultMaxRouterRequestBatchSize*2),
batchController: batch.NewController(
defaultMaxRouterRequestBatchSize,
requestFinisher(nil),
metrics.QueryRegionBestBatchSize,
),
}
c.leaderURL.Store(svcDiscovery.GetServingURL())
c.svcDiscovery.ExecAndAddLeaderSwitchedCallback(c.updateLeaderURL)
Expand All @@ -234,6 +239,7 @@ func (c *Cli) newRequest(ctx context.Context) *Request {
req.needBuckets = false
req.region = nil
// Initialize the runtime fields.
req.start = time.Now()
req.pool = c.reqPool

return req
Expand Down Expand Up @@ -523,14 +529,26 @@ func (c *Cli) processRequests(stream pdpb.PD_QueryRegionClient) error {
panic("invalid region query request received")
}
}
start := time.Now()
err := stream.Send(queryReq)
if err != nil {
return err
}
metrics.QueryRegionBatchSendLatency.Observe(
time.Since(
c.batchController.GetExtraBatchingStartTime(),
).Seconds(),
)
resp, err := stream.Recv()
if err != nil {
metrics.RequestFailedDurationQueryRegion.Observe(time.Since(start).Seconds())
return err
}
metrics.RequestDurationQueryRegion.Observe(time.Since(start).Seconds())
metrics.QueryRegionBatchSizeTotal.Observe(float64(len(requests)))
metrics.QueryRegionBatchSizeByKeys.Observe(float64(len(queryReq.Keys)))
metrics.QueryRegionBatchSizeByPrevKeys.Observe(float64(len(queryReq.PrevKeys)))
metrics.QueryRegionBatchSizeByIDs.Observe(float64(len(queryReq.Ids)))
c.doneCollectedRequests(resp)
return nil
}
Expand Down
13 changes: 11 additions & 2 deletions client/clients/router/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"runtime/trace"
"sync"
"time"

"github.com/pingcap/errors"

"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
)

Expand All @@ -44,7 +46,8 @@ type Request struct {
region *Region

// Runtime fields.
pool *sync.Pool
start time.Time
pool *sync.Pool
}

func (req *Request) tryDone(err error) {
Expand All @@ -55,14 +58,20 @@ func (req *Request) tryDone(err error) {
}

func (req *Request) wait() (*Region, error) {
// TODO: introduce the metrics.
start := time.Now()
metrics.CmdDurationQueryRegionAsyncWait.Observe(start.Sub(req.start).Seconds())
select {
case err := <-req.done:
defer req.pool.Put(req)
defer trace.StartRegion(req.requestCtx, "pdclient.regionReqDone").End()
now := time.Now()
if err != nil {
metrics.CmdFailedDurationQueryRegionWait.Observe(now.Sub(start).Seconds())
metrics.CmdFailedDurationQueryRegion.Observe(now.Sub(req.start).Seconds())
return nil, errors.WithStack(err)
}
metrics.CmdDurationQueryRegionWait.Observe(now.Sub(start).Seconds())
metrics.CmdDurationQueryRegion.Observe(now.Sub(req.start).Seconds())
return req.region, nil
case <-req.requestCtx.Done():
return nil, errors.WithStack(req.requestCtx.Err())
Expand Down
1 change: 1 addition & 0 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *innerClient) setup() error {

// Create dispatchers
c.createTokenDispatcher()

return nil
}

Expand Down
73 changes: 70 additions & 3 deletions client/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var initialized int32

func init() {
initMetrics(prometheus.Labels{})
initCmdDurations()
initLabelValues()
initRegisteredConsumers()
}

Expand Down Expand Up @@ -56,7 +56,7 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) {
if atomic.CompareAndSwapInt32(&initialized, 0, 1) {
// init metrics with constLabels
initMetrics(constLabels)
initCmdDurations()
initLabelValues()
initRegisteredConsumers()
// register metrics
registerMetrics()
Expand Down Expand Up @@ -84,6 +84,12 @@ var (
EstimateTSOLatencyGauge *prometheus.GaugeVec
// CircuitBreakerCounters is a vector for different circuit breaker counters
CircuitBreakerCounters *prometheus.CounterVec
// QueryRegionBestBatchSize is the histogram of the best batch size of query region requests.
QueryRegionBestBatchSize prometheus.Histogram
// QueryRegionBatchSize is the histogram of the batch size of query region requests.
QueryRegionBatchSize *prometheus.HistogramVec
// QueryRegionBatchSendLatency is the histogram of the latency of sending query region requests.
QueryRegionBatchSendLatency prometheus.Histogram
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -201,6 +207,36 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "Circuit breaker counters",
ConstLabels: constLabels,
}, []string{"name", "event"})

QueryRegionBestBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "handle_query_region_best_batch_size",
Help: "Bucketed histogram of the best batch size of handled query region requests.",
ConstLabels: constLabels,
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
})

QueryRegionBatchSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "handle_query_region_batch_size",
Help: "Bucketed histogram of the batch size of handled query region requests.",
ConstLabels: constLabels,
Buckets: []float64{1, 2, 4, 8, 10, 14, 18, 22, 26, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 110, 120, 140, 160, 180, 200, 500, 1000},
}, []string{"type"})

QueryRegionBatchSendLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "query_region_batch_send_latency",
ConstLabels: constLabels,
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
Help: "query region batch send latency",
})
}

// CmdDurationXXX and CmdFailedDurationXXX are the durations of the client commands.
Expand Down Expand Up @@ -230,6 +266,9 @@ var (
CmdDurationPut prometheus.Observer
CmdDurationUpdateGCSafePointV2 prometheus.Observer
CmdDurationUpdateServiceSafePointV2 prometheus.Observer
CmdDurationQueryRegionAsyncWait prometheus.Observer
CmdDurationQueryRegionWait prometheus.Observer
CmdDurationQueryRegion prometheus.Observer

CmdFailedDurationGetRegion prometheus.Observer
CmdFailedDurationTSOWait prometheus.Observer
Expand All @@ -249,6 +288,9 @@ var (
CmdFailedDurationPut prometheus.Observer
CmdFailedDurationUpdateGCSafePointV2 prometheus.Observer
CmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer
CmdFailedDurationQueryRegionAsyncWait prometheus.Observer
CmdFailedDurationQueryRegionWait prometheus.Observer
CmdFailedDurationQueryRegion prometheus.Observer

InternalCmdDurationGetClusterInfo prometheus.Observer
InternalCmdDurationGetMembers prometheus.Observer
Expand All @@ -260,9 +302,18 @@ var (
RequestDurationTSO prometheus.Observer
// RequestFailedDurationTSO records the durations of the failed TSO requests.
RequestFailedDurationTSO prometheus.Observer
// RequestDurationQueryRegion records the durations of the successful query region requests.
RequestDurationQueryRegion prometheus.Observer
// RequestFailedDurationQueryRegion records the durations of the failed query region requests.
RequestFailedDurationQueryRegion prometheus.Observer

QueryRegionBatchSizeTotal prometheus.Observer
QueryRegionBatchSizeByKeys prometheus.Observer
QueryRegionBatchSizeByPrevKeys prometheus.Observer
QueryRegionBatchSizeByIDs prometheus.Observer
)

func initCmdDurations() {
func initLabelValues() {
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
CmdDurationTSOWait = cmdDuration.WithLabelValues("wait")
CmdDurationTSO = cmdDuration.WithLabelValues("tso")
Expand All @@ -289,6 +340,9 @@ func initCmdDurations() {
CmdDurationPut = cmdDuration.WithLabelValues("put")
CmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
CmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2")
CmdDurationQueryRegionAsyncWait = cmdDuration.WithLabelValues("query_region_async_wait")
CmdDurationQueryRegionWait = cmdDuration.WithLabelValues("query_region_wait")
CmdDurationQueryRegion = cmdDuration.WithLabelValues("query_region")

CmdFailedDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
CmdFailedDurationTSOWait = cmdFailedDuration.WithLabelValues("wait")
Expand All @@ -308,6 +362,9 @@ func initCmdDurations() {
CmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put")
CmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2")
CmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2")
CmdFailedDurationQueryRegionAsyncWait = cmdFailedDuration.WithLabelValues("query_region_async_wait")
CmdFailedDurationQueryRegionWait = cmdFailedDuration.WithLabelValues("query_region_wait")
CmdFailedDurationQueryRegion = cmdFailedDuration.WithLabelValues("query_region")

InternalCmdDurationGetClusterInfo = internalCmdDuration.WithLabelValues("get_cluster_info")
InternalCmdDurationGetMembers = internalCmdDuration.WithLabelValues("get_members")
Expand All @@ -317,6 +374,13 @@ func initCmdDurations() {

RequestDurationTSO = requestDuration.WithLabelValues("tso")
RequestFailedDurationTSO = requestDuration.WithLabelValues("tso-failed")
RequestDurationQueryRegion = requestDuration.WithLabelValues("query_region")
RequestFailedDurationQueryRegion = requestDuration.WithLabelValues("query_region-failed")

QueryRegionBatchSizeTotal = QueryRegionBatchSize.WithLabelValues("total")
QueryRegionBatchSizeByKeys = QueryRegionBatchSize.WithLabelValues("by_keys")
QueryRegionBatchSizeByPrevKeys = QueryRegionBatchSize.WithLabelValues("by_prev_keys")
QueryRegionBatchSizeByIDs = QueryRegionBatchSize.WithLabelValues("by_ids")
}

func registerMetrics() {
Expand All @@ -331,4 +395,7 @@ func registerMetrics() {
prometheus.MustRegister(RequestForwarded)
prometheus.MustRegister(EstimateTSOLatencyGauge)
prometheus.MustRegister(CircuitBreakerCounters)
prometheus.MustRegister(QueryRegionBestBatchSize)
prometheus.MustRegister(QueryRegionBatchSize)
prometheus.MustRegister(QueryRegionBatchSendLatency)
}
Loading

0 comments on commit 6b9ccfc

Please sign in to comment.