Skip to content

Commit

Permalink
*: refactor error stats
Browse files Browse the repository at this point in the history
Introduce ResponseError type to record error client received. It
includes timestamp, timespan in seconds and error message. The
RunnerMetricReport will export raw data about each response error. It
can help us build view about error.

```go
// ResponseError is the record about that error.
type ResponseError struct {
       // Timestamp indicates when this error was received.
       Timestamp time.Time `json:"timestamp"`
       // Duration records timespan in seconds.
       Duration float64 `json:"duration"`
       // Type indicates that category to which the error belongs.
       Type ResponseErrorType `json:"type"`
       // Code only works when Type is http.
       Code int `json:"code,omitempty"`
       // Message shows error message for this error.
       //
       // NOTE: When Type is http, this field will be empty.
       Message string `json:"message,omitempty"`
}
```

Signed-off-by: Wei Fu <weifu@microsoft.com>
  • Loading branch information
fuweid committed Dec 15, 2024
1 parent c5772de commit a182551
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 177 deletions.
108 changes: 34 additions & 74 deletions api/types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,43 @@

package types

// HTTP2ErrorStats is the report about http2 error during testing.
type HTTP2ErrorStats struct {
// ConnectionErrors represents connection level errors.
ConnectionErrors map[string]int32 `json:"connectionErrors,omitempty"`
// StreamErrors represents stream level errors.
StreamErrors map[string]int32 `json:"streamErrors,omitempty"`
}

// NewHTTP2ErrorStats returns new instance of HTTP2ErrorStats.
func NewHTTP2ErrorStats() *HTTP2ErrorStats {
return &HTTP2ErrorStats{
ConnectionErrors: make(map[string]int32, 10),
StreamErrors: make(map[string]int32, 10),
}
}

// ResponseErrorStats is the report about errors.
type ResponseErrorStats struct {
// UnknownErrors is all unknown errors.
UnknownErrors []string `json:"unknownErrors"`
// NetErrors is to track errors from net.
NetErrors map[string]int32 `json:"netErrors"`
// ResponseCodes records request number grouped by response
// code between 400 and 600.
ResponseCodes map[int]int32 `json:"responseCodes"`
// HTTP2Errors records http2 related errors.
HTTP2Errors HTTP2ErrorStats `json:"http2Errors"`
}

// NewResponseErrorStats returns empty ResponseErrorStats.
func NewResponseErrorStats() *ResponseErrorStats {
return &ResponseErrorStats{
UnknownErrors: make([]string, 0, 1024),
NetErrors: make(map[string]int32, 10),
ResponseCodes: map[int]int32{},
HTTP2Errors: *NewHTTP2ErrorStats(),
}
}
import "time"

// Copy clones self.
func (r *ResponseErrorStats) Copy() ResponseErrorStats {
res := NewResponseErrorStats()
// ResponseErrorType is error type of response.
type ResponseErrorType string

res.UnknownErrors = make([]string, len(r.UnknownErrors))
copy(res.UnknownErrors, r.UnknownErrors)
res.NetErrors = cloneMap(r.NetErrors)
res.ResponseCodes = cloneMap(r.ResponseCodes)
res.HTTP2Errors.ConnectionErrors = cloneMap(r.HTTP2Errors.ConnectionErrors)
res.HTTP2Errors.StreamErrors = cloneMap(r.HTTP2Errors.StreamErrors)
return *res
}
const (
// ResponseErrorTypeUnknown indicates we don't have correct category for errors.
ResponseErrorTypeUnknown ResponseErrorType = "unknown"
// ResponseErrorTypeHTTP indicates that the response returns http code >= 400.
ResponseErrorTypeHTTP ResponseErrorType = "http"
// ResponseErrorTypeHTTP2Protocol indicates that error comes from http2 layer.
ResponseErrorTypeHTTP2Protocol ResponseErrorType = "http2-protocol"
// ResponseErrorTypeConnection indicates that error is related to connection.
// For instance, connection refused caused by server down.
ResponseErrorTypeConnection ResponseErrorType = "connection"
)

// Merge merges two ResponseErrorStats.
func (r *ResponseErrorStats) Merge(from *ResponseErrorStats) {
r.UnknownErrors = append(r.UnknownErrors, from.UnknownErrors...)
mergeMap(r.NetErrors, from.NetErrors)
mergeMap(r.ResponseCodes, from.ResponseCodes)
mergeMap(r.HTTP2Errors.ConnectionErrors, from.HTTP2Errors.ConnectionErrors)
mergeMap(r.HTTP2Errors.StreamErrors, from.HTTP2Errors.StreamErrors)
// ResponseError is the record about that error.
type ResponseError struct {
// Timestamp indicates when this error was received.
Timestamp time.Time `json:"timestamp"`
// Duration records timespan in seconds.
Duration float64 `json:"duration"`
// Type indicates that category to which the error belongs.
Type ResponseErrorType `json:"type"`
// Code only works when Type is http.
Code int `json:"code,omitempty"`
// Message shows error message for this error.
//
// NOTE: When Type is http, this field will be empty.
Message string `json:"message,omitempty"`
}

// ResponseStats is the report about benchmark result.
type ResponseStats struct {
// ErrorStats means summary of errors.
ErrorStats ResponseErrorStats
// Errors stores all the observed errors.
Errors []ResponseError
// LatenciesByURL stores all the observed latencies for each request.
LatenciesByURL map[string][]float64
// TotalReceivedBytes is total bytes read from apiserver.
Expand All @@ -79,8 +51,10 @@ type RunnerMetricReport struct {
Total int `json:"total"`
// Duration means the time of benchmark.
Duration string `json:"duration"`
// ErrorStats means summary of errors.
ErrorStats ResponseErrorStats `json:"errorStats"`
// Errors stores all the observed errors.
Errors []ResponseError `json:"errors,omitempty"`
// ErrorStats means summary of errors group by type.
ErrorStats map[string]int32 `json:"errorStats,omitempty"`
// TotalReceivedBytes is total bytes read from apiserver.
TotalReceivedBytes int64 `json:"totalReceivedBytes"`
// LatenciesByURL stores all the observed latencies.
Expand All @@ -94,17 +68,3 @@ type RunnerMetricReport struct {
// TODO(weifu): build brand new struct for RunnerGroupsReport to include more
// information, like how many runner groups, service account and flow control.
type RunnerGroupsReport = RunnerMetricReport

func mergeMap[K comparable, V int32](to, from map[K]V) {
for key, value := range from {
to[key] += value
}
}

func cloneMap[K comparable, V int32](src map[K]V) map[K]V {
res := map[K]V{}
for key, value := range src {
res[key] = value
}
return res
}
3 changes: 2 additions & 1 deletion cmd/kperf/commands/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) {
func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error {
output := types.RunnerMetricReport{
Total: stats.Total,
ErrorStats: stats.ErrorStats,
ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors),
Duration: stats.Duration.String(),
TotalReceivedBytes: stats.TotalReceivedBytes,

Expand All @@ -215,6 +215,7 @@ func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Res

if rawDataFlagIncluded {
output.LatenciesByURL = stats.LatenciesByURL
output.Errors = stats.Errors
}

encoder := json.NewEncoder(f)
Expand Down
45 changes: 31 additions & 14 deletions metrics/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"container/list"
"sync"
"sync/atomic"
"time"

"github.com/Azure/kperf/api/types"
)
Expand All @@ -16,7 +17,7 @@ type ResponseMetric interface {
// ObserveLatency observes latency.
ObserveLatency(url string, seconds float64)
// ObserveFailure observes failure response.
ObserveFailure(err error)
ObserveFailure(now time.Time, seconds float64, err error)
// ObserveReceivedBytes observes the bytes read from apiserver.
ObserveReceivedBytes(bytes int64)
// Gather returns the summary.
Expand All @@ -25,14 +26,14 @@ type ResponseMetric interface {

type responseMetricImpl struct {
mu sync.Mutex
errorStats *types.ResponseErrorStats
errors *list.List
receivedBytes int64
latenciesByURLs map[string]*list.List
}

func NewResponseMetric() ResponseMetric {
return &responseMetricImpl{
errorStats: types.NewResponseErrorStats(),
errors: list.New(),
latenciesByURLs: map[string]*list.List{},
}
}
Expand All @@ -51,26 +52,38 @@ func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) {
}

// ObserveFailure implements ResponseMetric.
func (m *responseMetricImpl) ObserveFailure(err error) {
func (m *responseMetricImpl) ObserveFailure(now time.Time, seconds float64, err error) {
if err == nil {
return
}

m.mu.Lock()
defer m.mu.Unlock()

// HTTP2 -> TCP/TLS -> Unknown
oerr := types.ResponseError{
Timestamp: now,
Duration: seconds,
}

// HTTP Code -> HTTP2 -> Connection -> Unknown
code := codeFromHTTP(err)
http2Err, isHTTP2Err := isHTTP2Error(err)
connErr, isConnErr := isConnectionError(err)
switch {
case code != 0:
m.errorStats.ResponseCodes[code]++
case isHTTP2Error(err):
updateHTTP2ErrorStats(m.errorStats, err)
case isNetRelatedError(err):
updateNetErrors(m.errorStats, err)
oerr.Type = types.ResponseErrorTypeHTTP
oerr.Code = code
case isHTTP2Err:
oerr.Type = types.ResponseErrorTypeHTTP2Protocol
oerr.Message = http2Err
case isConnErr:
oerr.Type = types.ResponseErrorTypeConnection
oerr.Message = connErr
default:
m.errorStats.UnknownErrors = append(m.errorStats.UnknownErrors, err.Error())
oerr.Type = types.ResponseErrorTypeUnknown
oerr.Message = err.Error()
}
m.errors.PushBack(oerr)
}

// ObserveReceivedBytes implements ResponseMetric.
Expand All @@ -81,7 +94,7 @@ func (m *responseMetricImpl) ObserveReceivedBytes(bytes int64) {
// Gather implements ResponseMetric.
func (m *responseMetricImpl) Gather() types.ResponseStats {
return types.ResponseStats{
ErrorStats: m.dumpErrorStats(),
Errors: m.dumpErrors(),
LatenciesByURL: m.dumpLatencies(),
TotalReceivedBytes: atomic.LoadInt64(&m.receivedBytes),
}
Expand All @@ -102,9 +115,13 @@ func (m *responseMetricImpl) dumpLatencies() map[string][]float64 {
return res
}

func (m *responseMetricImpl) dumpErrorStats() types.ResponseErrorStats {
func (m *responseMetricImpl) dumpErrors() []types.ResponseError {
m.mu.Lock()
defer m.mu.Unlock()

return m.errorStats.Copy()
res := make([]types.ResponseError, 0, m.errors.Len())
for e := m.errors.Front(); e != nil; e = e.Next() {
res = append(res, e.Value.(types.ResponseError))
}
return res
}
Loading

0 comments on commit a182551

Please sign in to comment.