diff --git a/api/types/metric.go b/api/types/metric.go index 6cf0fe4..27c1af9 100644 --- a/api/types/metric.go +++ b/api/types/metric.go @@ -3,71 +3,43 @@ package types -// HTTP2ErrorStats is the report about http2 error during testing. -type HTTP2ErrorStats struct { - // ConnectionErrors represents connection level errors. - ConnectionErrors map[string]int32 `json:"connectionErrors,omitempty"` - // StreamErrors represents stream level errors. - StreamErrors map[string]int32 `json:"streamErrors,omitempty"` -} - -// NewHTTP2ErrorStats returns new instance of HTTP2ErrorStats. -func NewHTTP2ErrorStats() *HTTP2ErrorStats { - return &HTTP2ErrorStats{ - ConnectionErrors: make(map[string]int32, 10), - StreamErrors: make(map[string]int32, 10), - } -} - -// ResponseErrorStats is the report about errors. -type ResponseErrorStats struct { - // UnknownErrors is all unknown errors. - UnknownErrors []string `json:"unknownErrors"` - // NetErrors is to track errors from net. - NetErrors map[string]int32 `json:"netErrors"` - // ResponseCodes records request number grouped by response - // code between 400 and 600. - ResponseCodes map[int]int32 `json:"responseCodes"` - // HTTP2Errors records http2 related errors. - HTTP2Errors HTTP2ErrorStats `json:"http2Errors"` -} +import "time" -// NewResponseErrorStats returns empty ResponseErrorStats. -func NewResponseErrorStats() *ResponseErrorStats { - return &ResponseErrorStats{ - UnknownErrors: make([]string, 0, 1024), - NetErrors: make(map[string]int32, 10), - ResponseCodes: map[int]int32{}, - HTTP2Errors: *NewHTTP2ErrorStats(), - } -} +// ResponseErrorType is error type of response. +type ResponseErrorType string -// Copy clones self. -func (r *ResponseErrorStats) Copy() ResponseErrorStats { - res := NewResponseErrorStats() - - res.UnknownErrors = make([]string, len(r.UnknownErrors)) - copy(res.UnknownErrors, r.UnknownErrors) - res.NetErrors = cloneMap(r.NetErrors) - res.ResponseCodes = cloneMap(r.ResponseCodes) - res.HTTP2Errors.ConnectionErrors = cloneMap(r.HTTP2Errors.ConnectionErrors) - res.HTTP2Errors.StreamErrors = cloneMap(r.HTTP2Errors.StreamErrors) - return *res -} +const ( + // ResponseErrorTypeUnknown indicates we don't have correct category for errors. + ResponseErrorTypeUnknown ResponseErrorType = "unknown" + // ResponseErrorTypeHTTP indicates that the response returns http code >= 400. + ResponseErrorTypeHTTP ResponseErrorType = "http" + // ResponseErrorTypeHTTP2Protocol indicates that error comes from http2 layer. + ResponseErrorTypeHTTP2Protocol ResponseErrorType = "http2-protocol" + // ResponseErrorTypeConnection indicates that error is related to connection. + // For instance, connection refused caused by server down. + ResponseErrorTypeConnection ResponseErrorType = "connection" +) -// Merge merges two ResponseErrorStats. -func (r *ResponseErrorStats) Merge(from *ResponseErrorStats) { - r.UnknownErrors = append(r.UnknownErrors, from.UnknownErrors...) - mergeMap(r.NetErrors, from.NetErrors) - mergeMap(r.ResponseCodes, from.ResponseCodes) - mergeMap(r.HTTP2Errors.ConnectionErrors, from.HTTP2Errors.ConnectionErrors) - mergeMap(r.HTTP2Errors.StreamErrors, from.HTTP2Errors.StreamErrors) +// ResponseError is the record about that error. +type ResponseError struct { + // Timestamp indicates when this error was received. + Timestamp time.Time `json:"timestamp"` + // Duration records timespan in seconds. + Duration float64 `json:"duration"` + // Type indicates that category to which the error belongs. + Type ResponseErrorType `json:"type"` + // Code only works when Type is http. + Code int `json:"code,omitempty"` + // Message shows error message for this error. + // + // NOTE: When Type is http, this field will be empty. + Message string `json:"message,omitempty"` } // ResponseStats is the report about benchmark result. type ResponseStats struct { - // ErrorStats means summary of errors. - ErrorStats ResponseErrorStats + // Errors stores all the observed errors. + Errors []ResponseError // LatenciesByURL stores all the observed latencies for each request. LatenciesByURL map[string][]float64 // TotalReceivedBytes is total bytes read from apiserver. @@ -79,8 +51,10 @@ type RunnerMetricReport struct { Total int `json:"total"` // Duration means the time of benchmark. Duration string `json:"duration"` - // ErrorStats means summary of errors. - ErrorStats ResponseErrorStats `json:"errorStats"` + // Errors stores all the observed errors. + Errors []ResponseError `json:"errors,omitempty"` + // ErrorStats means summary of errors group by type. + ErrorStats map[string]int32 `json:"errorStats,omitempty"` // TotalReceivedBytes is total bytes read from apiserver. TotalReceivedBytes int64 `json:"totalReceivedBytes"` // LatenciesByURL stores all the observed latencies. diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index edf3c1f..d04546c 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -192,7 +192,7 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error { output := types.RunnerMetricReport{ Total: stats.Total, - ErrorStats: stats.ErrorStats, + ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors), Duration: stats.Duration.String(), TotalReceivedBytes: stats.TotalReceivedBytes, @@ -215,6 +215,7 @@ func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Res if rawDataFlagIncluded { output.LatenciesByURL = stats.LatenciesByURL + output.Errors = stats.Errors } encoder := json.NewEncoder(f) diff --git a/metrics/request.go b/metrics/request.go index 571c4fd..979ad22 100644 --- a/metrics/request.go +++ b/metrics/request.go @@ -7,6 +7,7 @@ import ( "container/list" "sync" "sync/atomic" + "time" "github.com/Azure/kperf/api/types" ) @@ -16,7 +17,7 @@ type ResponseMetric interface { // ObserveLatency observes latency. ObserveLatency(url string, seconds float64) // ObserveFailure observes failure response. - ObserveFailure(err error) + ObserveFailure(now time.Time, seconds float64, err error) // ObserveReceivedBytes observes the bytes read from apiserver. ObserveReceivedBytes(bytes int64) // Gather returns the summary. @@ -25,14 +26,14 @@ type ResponseMetric interface { type responseMetricImpl struct { mu sync.Mutex - errorStats *types.ResponseErrorStats + errors *list.List receivedBytes int64 latenciesByURLs map[string]*list.List } func NewResponseMetric() ResponseMetric { return &responseMetricImpl{ - errorStats: types.NewResponseErrorStats(), + errors: list.New(), latenciesByURLs: map[string]*list.List{}, } } @@ -51,7 +52,7 @@ func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) { } // ObserveFailure implements ResponseMetric. -func (m *responseMetricImpl) ObserveFailure(err error) { +func (m *responseMetricImpl) ObserveFailure(now time.Time, seconds float64, err error) { if err == nil { return } @@ -59,18 +60,30 @@ func (m *responseMetricImpl) ObserveFailure(err error) { m.mu.Lock() defer m.mu.Unlock() - // HTTP2 -> TCP/TLS -> Unknown + oerr := types.ResponseError{ + Timestamp: now, + Duration: seconds, + } + + // HTTP Code -> HTTP2 -> Connection -> Unknown code := codeFromHTTP(err) + http2Err, isHTTP2Err := isHTTP2Error(err) + connErr, isConnErr := isConnectionError(err) switch { case code != 0: - m.errorStats.ResponseCodes[code]++ - case isHTTP2Error(err): - updateHTTP2ErrorStats(m.errorStats, err) - case isNetRelatedError(err): - updateNetErrors(m.errorStats, err) + oerr.Type = types.ResponseErrorTypeHTTP + oerr.Code = code + case isHTTP2Err: + oerr.Type = types.ResponseErrorTypeHTTP2Protocol + oerr.Message = http2Err + case isConnErr: + oerr.Type = types.ResponseErrorTypeConnection + oerr.Message = connErr default: - m.errorStats.UnknownErrors = append(m.errorStats.UnknownErrors, err.Error()) + oerr.Type = types.ResponseErrorTypeUnknown + oerr.Message = err.Error() } + m.errors.PushBack(oerr) } // ObserveReceivedBytes implements ResponseMetric. @@ -81,7 +94,7 @@ func (m *responseMetricImpl) ObserveReceivedBytes(bytes int64) { // Gather implements ResponseMetric. func (m *responseMetricImpl) Gather() types.ResponseStats { return types.ResponseStats{ - ErrorStats: m.dumpErrorStats(), + Errors: m.dumpErrors(), LatenciesByURL: m.dumpLatencies(), TotalReceivedBytes: atomic.LoadInt64(&m.receivedBytes), } @@ -102,9 +115,13 @@ func (m *responseMetricImpl) dumpLatencies() map[string][]float64 { return res } -func (m *responseMetricImpl) dumpErrorStats() types.ResponseErrorStats { +func (m *responseMetricImpl) dumpErrors() []types.ResponseError { m.mu.Lock() defer m.mu.Unlock() - return m.errorStats.Copy() + res := make([]types.ResponseError, 0, m.errors.Len()) + for e := m.errors.Front(); e != nil; e = e.Next() { + res = append(res, e.Value.(types.ResponseError)) + } + return res } diff --git a/metrics/request_test.go b/metrics/request_test.go index c22ca57..557923e 100644 --- a/metrics/request_test.go +++ b/metrics/request_test.go @@ -10,6 +10,7 @@ import ( "io" "syscall" "testing" + "time" "github.com/Azure/kperf/api/types" @@ -19,31 +20,99 @@ import ( ) func TestResponseMetric_ObserveFailure(t *testing.T) { - expectedStats := types.ResponseErrorStats{ - UnknownErrors: []string{ - "unknown", - }, - ResponseCodes: map[int]int32{ - 429: 1, - 500: 1, - 504: 1, - }, - NetErrors: map[string]int32{ - "net/http: TLS handshake timeout": 2, - "connection reset by peer": 1, - "connection refused": 1, - "unexpected EOF": 1, - "context deadline exceeded": 1, - }, - HTTP2Errors: types.HTTP2ErrorStats{ - ConnectionErrors: map[string]int32{ - "http2: client connection lost": 2, - "http2: server sent GOAWAY and closed the connection; ErrCode=NO_ERROR, debug=\"\"": 1, - "http2: server sent GOAWAY and closed the connection; ErrCode=PROTOCOL_ERROR, debug=\"\"": 1, - }, - StreamErrors: map[string]int32{ - "CONNECT_ERROR": 1, - }, + observedAt := time.Now() + dur := 10 * time.Second + + expectedErrors := []types.ResponseError{ + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP, + Code: 429, + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP, + Code: 500, + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP, + Code: 504, + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: server sent GOAWAY and closed the connection; ErrCode=NO_ERROR, debug=", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: server sent GOAWAY and closed the connection; ErrCode=PROTOCOL_ERROR, debug=", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: client connection lost", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: client connection lost", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: http2.ErrCode(10).String(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: "net/http: TLS handshake timeout", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: "net/http: TLS handshake timeout", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: "context deadline exceeded", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: syscall.ECONNRESET.Error(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: syscall.ECONNREFUSED.Error(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: io.ErrUnexpectedEOF.Error(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeUnknown, + Message: "unknown", }, } @@ -82,8 +151,8 @@ func TestResponseMetric_ObserveFailure(t *testing.T) { m := NewResponseMetric() for _, err := range errs { - m.ObserveFailure(err) + m.ObserveFailure(observedAt, dur.Seconds(), err) } - stats := m.Gather().ErrorStats - assert.Equal(t, expectedStats, stats) + errors := m.Gather().Errors + assert.Equal(t, expectedErrors, errors) } diff --git a/metrics/utils.go b/metrics/utils.go index f0c4f3f..d2b9b14 100644 --- a/metrics/utils.go +++ b/metrics/utils.go @@ -15,7 +15,6 @@ import ( "syscall" "github.com/Azure/kperf/api/types" - "golang.org/x/net/http2" apierrors "k8s.io/apimachinery/pkg/api/errors" ) @@ -42,6 +41,23 @@ func BuildPercentileLatencies(latencies []float64) [][2]float64 { return res } +// BuildErrorStatsGroupByType summaries total count for each type of errors. +func BuildErrorStatsGroupByType(errors []types.ResponseError) map[string]int32 { + res := map[string]int32{} + + for _, err := range errors { + var key string + switch err.Type { + case types.ResponseErrorTypeHTTP: + key = fmt.Sprintf("%s/%d", err.Type, err.Code) + default: + key = fmt.Sprintf("%s/%s", err.Type, err.Message) + } + res[key]++ + } + return res +} + var ( // errHTTP2ClientConnectionLost is used to track unexported http2 error. errHTTP2ClientConnectionLost = errors.New("http2: client connection lost") @@ -95,84 +111,55 @@ func codeFromHTTP(err error) int { } } -// updateHTTP2ErrorStats updates stats if err is http2 error. -func updateHTTP2ErrorStats(stats *types.ResponseErrorStats, err error) { +// isHTTP2Error returns true if it's related to http2 error. +func isHTTP2Error(err error) (string, bool) { + if err == nil { + return "", false + } + if connErr, ok := err.(http2.ConnectionError); ok || errors.As(err, &connErr) { - stats.HTTP2Errors.ConnectionErrors[(http2.ErrCode(connErr)).String()]++ - return + return (http2.ErrCode(connErr)).String(), true } if streamErr, ok := err.(http2.StreamError); ok || errors.As(err, &streamErr) { - stats.HTTP2Errors.StreamErrors[streamErr.Code.String()]++ - return + return streamErr.Code.String(), true } if connErr, ok := err.(http2.GoAwayError); ok || errors.As(err, &connErr) { - stats.HTTP2Errors.ConnectionErrors[fmt.Sprintf("http2: server sent GOAWAY and closed the connection; ErrCode=%v, debug=%q", connErr.ErrCode, connErr.DebugData)]++ - return + return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; ErrCode=%v, debug=%s", + connErr.ErrCode, connErr.DebugData), true } if strings.Contains(err.Error(), errHTTP2ClientConnectionLost.Error()) { - stats.HTTP2Errors.ConnectionErrors[errHTTP2ClientConnectionLost.Error()]++ + return errHTTP2ClientConnectionLost.Error(), true } + return "", false } -// updateNetErrors updates stats if err is related net error. -func updateNetErrors(stats *types.ResponseErrorStats, err error) { +// isConnectionError returns true if it's related to connection error. +func isConnectionError(err error) (string, bool) { if err == nil { - return + return "", false } - errInStr := err.Error() switch { case isTimeoutError(err): - stats.NetErrors[err.Error()]++ - case errors.Is(err, io.ErrUnexpectedEOF): - stats.NetErrors[io.ErrUnexpectedEOF.Error()]++ + return err.Error(), true case isConnectionRefused(err): - stats.NetErrors[syscall.ECONNREFUSED.Error()]++ + return syscall.ECONNREFUSED.Error(), true case isConnectionResetByPeer(err): - stats.NetErrors[syscall.ECONNRESET.Error()]++ - case strings.Contains(errInStr, errTLSHandshakeTimeout.Error()): - stats.NetErrors[errTLSHandshakeTimeout.Error()]++ + return syscall.ECONNRESET.Error(), true + case errors.Is(err, io.ErrUnexpectedEOF): + return io.ErrUnexpectedEOF.Error(), true + case errors.Is(err, io.EOF): + return io.EOF.Error(), true + case strings.Contains(err.Error(), errTLSHandshakeTimeout.Error()): + return errTLSHandshakeTimeout.Error(), true default: - // TODO(weifu): add more categories. + return "", false } } -// isHTTP2Error returns true if it's related to http2 error. -func isHTTP2Error(err error) bool { - if err == nil { - return false - } - - if connErr, ok := err.(http2.ConnectionError); ok || errors.As(err, &connErr) { - return true - } - - if streamErr, ok := err.(http2.StreamError); ok || errors.As(err, &streamErr) { - return true - } - - if connErr, ok := err.(http2.GoAwayError); ok || errors.As(err, &connErr) { - return true - } - - if strings.Contains(err.Error(), errHTTP2ClientConnectionLost.Error()) { - return true - } - return false -} - -// isNetRelatedError returns true if it's related to net error. -func isNetRelatedError(err error) bool { - return err != nil && (isTimeoutError(err) || - isConnectionRefused(err) || - isConnectionResetByPeer(err) || - errors.Is(err, io.ErrUnexpectedEOF) || - strings.Contains(err.Error(), errTLSHandshakeTimeout.Error())) -} - // isTimeoutError returns true if it's related to golang standard library // net's timeout error. func isTimeoutError(err error) bool { diff --git a/request/schedule.go b/request/schedule.go index ac06fd1..80f345a 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -104,11 +104,12 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I err = nil } } - latency := time.Since(start).Seconds() + end := time.Now() + latency := end.Sub(start).Seconds() respMetric.ObserveReceivedBytes(bytes) if err != nil { - respMetric.ObserveFailure(err) + respMetric.ObserveFailure(end, latency, err) klog.V(5).Infof("Request stream failed: %v", err) return } diff --git a/runner/utils.go b/runner/utils.go index 18ef5df..7ea6ff8 100644 --- a/runner/utils.go +++ b/runner/utils.go @@ -63,7 +63,8 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type totalBytes := int64(0) totalResp := 0 latenciesByURL := map[string]*list.List{} - errStats := types.NewResponseErrorStats() + errs := []types.ResponseError{} + errStats := map[string]int32{} maxDuration := 0 * time.Second for idx := range groups { @@ -107,7 +108,9 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type } // update error stats - errStats.Merge(&report.ErrorStats) + mergeErrorStat(errStats, report.ErrorStats) + errs = append(errs, report.Errors...) + report.Errors = nil // update max duration rDur, err := time.ParseDuration(report.Duration) @@ -133,7 +136,8 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type return &types.RunnerMetricReport{ Total: totalResp, - ErrorStats: *errStats, + Errors: errs, + ErrorStats: errStats, Duration: maxDuration.String(), TotalReceivedBytes: totalBytes, PercentileLatencies: metrics.BuildPercentileLatencies(latencies), @@ -150,6 +154,13 @@ func listToSliceFloat64(l *list.List) []float64 { return res } +// mergeErrorStat merges two error stats. +func mergeErrorStat(s, d map[string]int32) { + for e, n := range d { + s[e] += n + } +} + // readBlob reads blob data from localstore. func readBlob(s *localstore.Store, ref string) ([]byte, error) { r, err := s.OpenReader(ref)