From 411bb359b6dac6a1dee3fc43e02e5f13a1fa73b9 Mon Sep 17 00:00:00 2001 From: manasachinta Date: Thu, 28 Dec 2023 15:34:20 -0500 Subject: [PATCH 1/5] Added failure list code, tested --- api/types/metric.go | 2 ++ cmd/kperf/commands/runner/runner.go | 5 +++++ metrics/request.go | 15 +++++++++------ metrics/request_test.go | 2 +- request/schedule.go | 5 +++-- tmp/xx.yaml | 17 +++++++++++++++++ 6 files changed, 37 insertions(+), 9 deletions(-) create mode 100644 tmp/xx.yaml diff --git a/api/types/metric.go b/api/types/metric.go index d6eb2c1..a5f91be 100644 --- a/api/types/metric.go +++ b/api/types/metric.go @@ -8,6 +8,8 @@ type ResponseStats struct { Total int // Failures represents number of failure request. Failures int + // List of failures + FailureList []error // Duration means the time of benchmark. Duration time.Duration // PercentileLatencies represents the latency distribution in seconds. diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index 0acea61..67e1043 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -116,6 +116,11 @@ func printResponseStats(stats *types.ResponseStats) { fmt.Println("Response stat:") fmt.Printf(" Total: %v\n", stats.Total) fmt.Printf(" Failures: %v\n", stats.Failures) + fmt.Println("==========================") + for i, v := range stats.FailureList { + fmt.Printf(" Failure %d: %v\n", i+1, v) + } + fmt.Println("==========================") fmt.Printf(" Duration: %v\n", stats.Duration) fmt.Printf(" Requests/sec: %.2f\n", float64(stats.Total)/stats.Duration.Seconds()) diff --git a/metrics/request.go b/metrics/request.go index 374c929..5efd275 100644 --- a/metrics/request.go +++ b/metrics/request.go @@ -13,20 +13,22 @@ type ResponseMetric interface { // ObserveLatency observes latency. ObserveLatency(seconds float64) // ObserveFailure observes failure response. - ObserveFailure() + ObserveFailure(err error) // Gather returns the summary. - Gather() (latencies []float64, percentileLatencies map[float64]float64, failure int) + Gather() (latencies []float64, percentileLatencies map[float64]float64, failure int, failureList []error) } type responseMetricImpl struct { mu sync.Mutex failureCount int64 + failureList []error latencies *list.List } func NewResponseMetric() ResponseMetric { return &responseMetricImpl{ - latencies: list.New(), + latencies: list.New(), + failureList: []error{}, } } @@ -38,15 +40,16 @@ func (m *responseMetricImpl) ObserveLatency(seconds float64) { } // ObserveFailure implements ResponseMetric. -func (m *responseMetricImpl) ObserveFailure() { +func (m *responseMetricImpl) ObserveFailure(err error) { + m.failureList = append(m.failureList, err) atomic.AddInt64(&m.failureCount, 1) } // Gather implements ResponseMetric. -func (m *responseMetricImpl) Gather() ([]float64, map[float64]float64, int) { +func (m *responseMetricImpl) Gather() ([]float64, map[float64]float64, int, []error) { latencies := m.dumpLatencies() - return latencies, buildPercentileLatencies(latencies), int(atomic.LoadInt64(&m.failureCount)) + return latencies, buildPercentileLatencies(latencies), int(atomic.LoadInt64(&m.failureCount)), m.failureList } func (m *responseMetricImpl) dumpLatencies() []float64 { diff --git a/metrics/request_test.go b/metrics/request_test.go index 2cd8e15..eb4ad27 100644 --- a/metrics/request_test.go +++ b/metrics/request_test.go @@ -38,7 +38,7 @@ func TestResponseMetric(t *testing.T) { c.ObserveLatency(float64(i)) } - _, res, _ := c.Gather() + _, res, _, _ := c.Gather() assert.Equal(t, float64(1), res[0]) assert.Equal(t, float64(50), res[50]) assert.Equal(t, float64(90), res[90]) diff --git a/request/schedule.go b/request/schedule.go index 246fcc1..5f171bd 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -65,7 +65,7 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I _, err = io.Copy(io.Discard, respBody) } if err != nil { - respMetric.ObserveFailure() + respMetric.ObserveFailure(err) } }() } @@ -80,10 +80,11 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I totalDuration := time.Since(start) - _, percentileLatencies, failures := respMetric.Gather() + _, percentileLatencies, failures, failureList := respMetric.Gather() return &types.ResponseStats{ Total: spec.Total, Failures: failures, + FailureList: failureList, Duration: totalDuration, PercentileLatencies: percentileLatencies, }, nil diff --git a/tmp/xx.yaml b/tmp/xx.yaml new file mode 100644 index 0000000..23da4f1 --- /dev/null +++ b/tmp/xx.yaml @@ -0,0 +1,17 @@ +version: 1 +description: test +spec: + rate: 100 + total: 10 + conns: 10 + requests: + - staleList: + version: v1 + resource: pods + limit: 500 + shares: 100 + - quorumList: + version: v1 + resource: pods + limit: 1000 + shares: 150 From b53f59ca25666989c4a7985b7464cc88a98bf864 Mon Sep 17 00:00:00 2001 From: manasachinta Date: Mon, 1 Jan 2024 14:10:35 -0500 Subject: [PATCH 2/5] Removed failureCount field in ResponseMetric and ResponseStat, added mutex for ObserveFailure() --- api/types/metric.go | 2 -- cmd/kperf/commands/runner/runner.go | 2 +- metrics/request.go | 16 ++++++---------- metrics/request_test.go | 2 +- request/schedule.go | 8 ++++++-- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/api/types/metric.go b/api/types/metric.go index a5f91be..f5ad2ed 100644 --- a/api/types/metric.go +++ b/api/types/metric.go @@ -6,8 +6,6 @@ import "time" type ResponseStats struct { // Total represents total number of requests. Total int - // Failures represents number of failure request. - Failures int // List of failures FailureList []error // Duration means the time of benchmark. diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index 67e1043..1e659ca 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -115,7 +115,7 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { func printResponseStats(stats *types.ResponseStats) { fmt.Println("Response stat:") fmt.Printf(" Total: %v\n", stats.Total) - fmt.Printf(" Failures: %v\n", stats.Failures) + fmt.Printf(" Total Failures: %v\n", len(stats.FailureList)) fmt.Println("==========================") for i, v := range stats.FailureList { fmt.Printf(" Failure %d: %v\n", i+1, v) diff --git a/metrics/request.go b/metrics/request.go index 5efd275..eec8698 100644 --- a/metrics/request.go +++ b/metrics/request.go @@ -5,7 +5,6 @@ import ( "math" "sort" "sync" - "sync/atomic" ) // ResponseMetric is a measurement related to http response. @@ -15,14 +14,13 @@ type ResponseMetric interface { // ObserveFailure observes failure response. ObserveFailure(err error) // Gather returns the summary. - Gather() (latencies []float64, percentileLatencies map[float64]float64, failure int, failureList []error) + Gather() (latencies []float64, percentileLatencies map[float64]float64, failureList []error) } type responseMetricImpl struct { - mu sync.Mutex - failureCount int64 - failureList []error - latencies *list.List + mu sync.Mutex + failureList []error + latencies *list.List } func NewResponseMetric() ResponseMetric { @@ -42,14 +40,12 @@ func (m *responseMetricImpl) ObserveLatency(seconds float64) { // ObserveFailure implements ResponseMetric. func (m *responseMetricImpl) ObserveFailure(err error) { m.failureList = append(m.failureList, err) - atomic.AddInt64(&m.failureCount, 1) } // Gather implements ResponseMetric. -func (m *responseMetricImpl) Gather() ([]float64, map[float64]float64, int, []error) { +func (m *responseMetricImpl) Gather() ([]float64, map[float64]float64, []error) { latencies := m.dumpLatencies() - - return latencies, buildPercentileLatencies(latencies), int(atomic.LoadInt64(&m.failureCount)), m.failureList + return latencies, buildPercentileLatencies(latencies), m.failureList } func (m *responseMetricImpl) dumpLatencies() []float64 { diff --git a/metrics/request_test.go b/metrics/request_test.go index eb4ad27..2cd8e15 100644 --- a/metrics/request_test.go +++ b/metrics/request_test.go @@ -38,7 +38,7 @@ func TestResponseMetric(t *testing.T) { c.ObserveLatency(float64(i)) } - _, res, _, _ := c.Gather() + _, res, _ := c.Gather() assert.Equal(t, float64(1), res[0]) assert.Equal(t, float64(50), res[50]) assert.Equal(t, float64(90), res[90]) diff --git a/request/schedule.go b/request/schedule.go index 5f171bd..1e65249 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -16,6 +16,8 @@ import ( const defaultTimeout = 60 * time.Second +var m sync.Mutex + // Schedule files requests to apiserver based on LoadProfileSpec. func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.Interface) (*types.ResponseStats, error) { ctx, cancel := context.WithCancel(ctx) @@ -64,8 +66,11 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I // we don't need that unmarshal object. _, err = io.Copy(io.Discard, respBody) } + if err != nil { + m.Lock() respMetric.ObserveFailure(err) + m.Unlock() } }() } @@ -80,10 +85,9 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I totalDuration := time.Since(start) - _, percentileLatencies, failures, failureList := respMetric.Gather() + _, percentileLatencies, failureList := respMetric.Gather() return &types.ResponseStats{ Total: spec.Total, - Failures: failures, FailureList: failureList, Duration: totalDuration, PercentileLatencies: percentileLatencies, From cff5f226666674903ce03d0eeaea743da2c1c9ed Mon Sep 17 00:00:00 2001 From: manasachinta Date: Tue, 2 Jan 2024 00:31:38 -0500 Subject: [PATCH 3/5] Adding mutex inside ObserveFailure() instead of where it is called --- metrics/request.go | 2 ++ request/schedule.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metrics/request.go b/metrics/request.go index eec8698..7f425ab 100644 --- a/metrics/request.go +++ b/metrics/request.go @@ -39,6 +39,8 @@ func (m *responseMetricImpl) ObserveLatency(seconds float64) { // ObserveFailure implements ResponseMetric. func (m *responseMetricImpl) ObserveFailure(err error) { + m.mu.Lock() + defer m.mu.Unlock() m.failureList = append(m.failureList, err) } diff --git a/request/schedule.go b/request/schedule.go index 1e65249..64b7553 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -68,9 +68,7 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I } if err != nil { - m.Lock() respMetric.ObserveFailure(err) - m.Unlock() } }() } From f061cfbb502765ffd1828a7c483f298056e4f8f9 Mon Sep 17 00:00:00 2001 From: manasachinta Date: Tue, 2 Jan 2024 00:44:18 -0500 Subject: [PATCH 4/5] addressing more comments --- .gitignore | 3 +++ cmd/kperf/commands/runner/runner.go | 8 +++----- metrics/request.go | 3 ++- tmp/xx.yaml | 17 ----------------- 4 files changed, 8 insertions(+), 23 deletions(-) delete mode 100644 tmp/xx.yaml diff --git a/.gitignore b/.gitignore index 0bfa83c..1306df1 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ bin/ # Go workspace file go.work + +#tmp folder which contains .yaml files +tmp/ \ No newline at end of file diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index 1e659ca..0745433 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -115,12 +115,10 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { func printResponseStats(stats *types.ResponseStats) { fmt.Println("Response stat:") fmt.Printf(" Total: %v\n", stats.Total) - fmt.Printf(" Total Failures: %v\n", len(stats.FailureList)) - fmt.Println("==========================") - for i, v := range stats.FailureList { - fmt.Printf(" Failure %d: %v\n", i+1, v) + fmt.Printf(" Total Failures: %v\n", len(stats.FailureList)) + for _, v := range stats.FailureList { + fmt.Printf(" Failure: %v\n", v) } - fmt.Println("==========================") fmt.Printf(" Duration: %v\n", stats.Duration) fmt.Printf(" Requests/sec: %.2f\n", float64(stats.Total)/stats.Duration.Seconds()) diff --git a/metrics/request.go b/metrics/request.go index 7f425ab..80d7712 100644 --- a/metrics/request.go +++ b/metrics/request.go @@ -24,9 +24,10 @@ type responseMetricImpl struct { } func NewResponseMetric() ResponseMetric { + errList := make([]error, 0, 1024) return &responseMetricImpl{ latencies: list.New(), - failureList: []error{}, + failureList: errList, } } diff --git a/tmp/xx.yaml b/tmp/xx.yaml deleted file mode 100644 index 23da4f1..0000000 --- a/tmp/xx.yaml +++ /dev/null @@ -1,17 +0,0 @@ -version: 1 -description: test -spec: - rate: 100 - total: 10 - conns: 10 - requests: - - staleList: - version: v1 - resource: pods - limit: 500 - shares: 100 - - quorumList: - version: v1 - resource: pods - limit: 1000 - shares: 150 From 668d307e9149b0d81014be62dc75f43353010942 Mon Sep 17 00:00:00 2001 From: manasachinta Date: Wed, 3 Jan 2024 09:02:54 -0500 Subject: [PATCH 5/5] removing unused mutex variable in schedule.go --- request/schedule.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/request/schedule.go b/request/schedule.go index 64b7553..7a2a83d 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -16,8 +16,6 @@ import ( const defaultTimeout = 60 * time.Second -var m sync.Mutex - // Schedule files requests to apiserver based on LoadProfileSpec. func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.Interface) (*types.ResponseStats, error) { ctx, cancel := context.WithCancel(ctx)