Skip to content

Commit

Permalink
Merge pull request #131 from fuweid/weifu/pl-on-url
Browse files Browse the repository at this point in the history
*: track latency for each kind of requests
  • Loading branch information
fuweid authored Oct 15, 2024
2 parents d762844 + 3d20c9c commit 652487d
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 37 deletions.
10 changes: 6 additions & 4 deletions api/types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (r *ResponseErrorStats) Merge(from *ResponseErrorStats) {
type ResponseStats struct {
// ErrorStats means summary of errors.
ErrorStats ResponseErrorStats
// Latencies stores all the observed latencies.
Latencies []float64
// LatenciesByURL stores all the observed latencies for each request.
LatenciesByURL map[string][]float64
// TotalReceivedBytes is total bytes read from apiserver.
TotalReceivedBytes int64
}
Expand All @@ -80,10 +80,12 @@ type RunnerMetricReport struct {
ErrorStats ResponseErrorStats `json:"errorStats"`
// TotalReceivedBytes is total bytes read from apiserver.
TotalReceivedBytes int64 `json:"totalReceivedBytes"`
// Latencies stores all the observed latencies.
Latencies []float64 `json:"latencies,omitempty"`
// LatenciesByURL stores all the observed latencies.
LatenciesByURL map[string][]float64 `json:"latenciesByURL,omitempty"`
// PercentileLatencies represents the latency distribution in seconds.
PercentileLatencies [][2]float64 `json:"percentileLatencies,omitempty"`
// PercentileLatenciesByURL represents the latency distribution in seconds per request.
PercentileLatenciesByURL map[string][][2]float64 `json:"percentileLatenciesByURL,omitempty"`
}

// TODO(weifu): build brand new struct for RunnerGroupsReport to include more
Expand Down
34 changes: 24 additions & 10 deletions cmd/kperf/commands/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,35 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) {
// printResponseStats prints types.RunnerMetricReport into underlying file.
func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error {
output := types.RunnerMetricReport{
Total: stats.Total,
ErrorStats: stats.ErrorStats,
Duration: stats.Duration.String(),
Latencies: stats.Latencies,
TotalReceivedBytes: stats.TotalReceivedBytes,
PercentileLatencies: metrics.BuildPercentileLatencies(stats.Latencies),
Total: stats.Total,
ErrorStats: stats.ErrorStats,
Duration: stats.Duration.String(),
TotalReceivedBytes: stats.TotalReceivedBytes,

PercentileLatenciesByURL: map[string][][2]float64{},
}

encoder := json.NewEncoder(f)
encoder.SetIndent("", " ")
total := 0
for _, latencies := range stats.LatenciesByURL {
total += len(latencies)
}
latencies := make([]float64, 0, total)
for _, l := range stats.LatenciesByURL {
latencies = append(latencies, l...)
}
output.PercentileLatencies = metrics.BuildPercentileLatencies(latencies)

if !rawDataFlagIncluded {
output.Latencies = nil
for u, l := range stats.LatenciesByURL {
output.PercentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(l)
}

if rawDataFlagIncluded {
output.LatenciesByURL = stats.LatenciesByURL
}

encoder := json.NewEncoder(f)
encoder.SetIndent("", " ")

err := encoder.Encode(output)
if err != nil {
return fmt.Errorf("failed to encode json: %w", err)
Expand Down
39 changes: 25 additions & 14 deletions metrics/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// ResponseMetric is a measurement related to http response.
type ResponseMetric interface {
// ObserveLatency observes latency.
ObserveLatency(seconds float64)
ObserveLatency(url string, seconds float64)
// ObserveFailure observes failure response.
ObserveFailure(err error)
// ObserveReceivedBytes observes the bytes read from apiserver.
Expand All @@ -21,24 +21,30 @@ type ResponseMetric interface {
}

type responseMetricImpl struct {
mu sync.Mutex
errorStats *types.ResponseErrorStats
latencies *list.List
receivedBytes int64
mu sync.Mutex
errorStats *types.ResponseErrorStats
receivedBytes int64
latenciesByURLs map[string]*list.List
}

func NewResponseMetric() ResponseMetric {
return &responseMetricImpl{
latencies: list.New(),
errorStats: types.NewResponseErrorStats(),
errorStats: types.NewResponseErrorStats(),
latenciesByURLs: map[string]*list.List{},
}
}

// ObserveLatency implements ResponseMetric.
func (m *responseMetricImpl) ObserveLatency(seconds float64) {
func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) {
m.mu.Lock()
defer m.mu.Unlock()
m.latencies.PushBack(seconds)

l, ok := m.latenciesByURLs[url]
if !ok {
m.latenciesByURLs[url] = list.New()
l = m.latenciesByURLs[url]
}
l.PushBack(seconds)
}

// ObserveFailure implements ResponseMetric.
Expand Down Expand Up @@ -73,17 +79,22 @@ func (m *responseMetricImpl) ObserveReceivedBytes(bytes int64) {
func (m *responseMetricImpl) Gather() types.ResponseStats {
return types.ResponseStats{
ErrorStats: m.dumpErrorStats(),
Latencies: m.dumpLatencies(),
LatenciesByURL: m.dumpLatencies(),
TotalReceivedBytes: atomic.LoadInt64(&m.receivedBytes),
}
}

func (m *responseMetricImpl) dumpLatencies() []float64 {
func (m *responseMetricImpl) dumpLatencies() map[string][]float64 {
m.mu.Lock()
defer m.mu.Unlock()
res := make([]float64, 0, m.latencies.Len())
for e := m.latencies.Front(); e != nil; e = e.Next() {
res = append(res, e.Value.(float64))

res := make(map[string][]float64)
for u, latencies := range m.latenciesByURLs {
res[u] = make([]float64, 0, latencies.Len())

for e := latencies.Front(); e != nil; e = e.Next() {
res[u] = append(res[u], e.Value.(float64))
}
}
return res
}
Expand Down
2 changes: 1 addition & 1 deletion request/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I
klog.V(5).Infof("Request stream failed: %v", err)
return
}
respMetric.ObserveLatency(latency)
respMetric.ObserveLatency(req.URL().String(), latency)
}()
}
}(cli)
Expand Down
36 changes: 28 additions & 8 deletions runner/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func buildNetListeners(addrs []string) (_ []net.Listener, retErr error) {
// buildRunnerGroupSummary returns aggrecated summary from runner groups' report.
func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *types.RunnerMetricReport {
totalBytes := int64(0)
latencies := list.New()
totalResp := 0
latenciesByURL := map[string]*list.List{}
errStats := types.NewResponseErrorStats()
maxDuration := 0 * time.Second

Expand Down Expand Up @@ -90,8 +91,16 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type
totalBytes += report.TotalReceivedBytes

// update latencies
for _, v := range report.Latencies {
latencies.PushBack(v)
for u, l := range report.LatenciesByURL {
latencies, ok := latenciesByURL[u]
if !ok {
latenciesByURL[u] = list.New()
latencies = latenciesByURL[u]
}
for _, v := range l {
totalResp++
latencies.PushBack(v)
}
}

// update error stats
Expand All @@ -109,12 +118,23 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type
}
}

percentileLatenciesByURL := map[string][][2]float64{}

latencies := make([]float64, 0, totalResp)
for u, l := range latenciesByURL {
lInSlice := listToSliceFloat64(l)

latencies = append(latencies, lInSlice...)
percentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(lInSlice)
}

return &types.RunnerMetricReport{
Total: latencies.Len(),
ErrorStats: *errStats,
Duration: maxDuration.String(),
TotalReceivedBytes: totalBytes,
PercentileLatencies: metrics.BuildPercentileLatencies(listToSliceFloat64(latencies)),
Total: totalResp,
ErrorStats: *errStats,
Duration: maxDuration.String(),
TotalReceivedBytes: totalBytes,
PercentileLatencies: metrics.BuildPercentileLatencies(latencies),
PercentileLatenciesByURL: percentileLatenciesByURL,
}
}

Expand Down

0 comments on commit 652487d

Please sign in to comment.