diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 5ebd240ab4f..2a005c263f2 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -54,8 +54,8 @@ type MetricsEngine struct { // they can be both top-level metrics or sub-metrics // // TODO: remove the tracked map using the sequence number - metricsWithThresholds map[string]metrics.Thresholds - trackedMetrics map[string]*trackedMetric + metricsWithThresholds map[uint64]metrics.Thresholds + trackedMetrics []*trackedMetric breachedThresholdsCount uint32 } @@ -65,8 +65,7 @@ func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) { me := &MetricsEngine{ test: runState, logger: runState.Logger.WithField("component", "metrics-engine"), - metricsWithThresholds: make(map[string]metrics.Thresholds), - trackedMetrics: make(map[string]*trackedMetric), + metricsWithThresholds: make(map[uint64]metrics.Thresholds), } if me.test.RuntimeOptions.NoSummary.Bool && @@ -74,14 +73,19 @@ func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) { return me, nil } - for _, registered := range me.test.Registry.All() { - typ := registered.Type - me.trackedMetrics[registered.Name] = &trackedMetric{ - Metric: registered, - sink: metrics.NewSinkByType(typ), - } + // It adds all the registered metrics as tracked + // the custom metrics are also added because they have + // been seen and registered during the initEnv run + // that must run before this constructor is called. + registered := me.test.Registry.All() + me.trackedMetrics = make([]*trackedMetric, len(registered)+1) + for _, mreg := range registered { + me.trackMetric(mreg) } + // It adds register and tracks all the metrics defined by the thresholds. + // They are also marked as observed because + // the summary wants them also if they didn't receive any sample. err := me.initSubMetricsAndThresholds() if err != nil { return nil, err @@ -156,27 +160,19 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { return fmt.Errorf("invalid metric '%s' in threshold definitions: %w", metricName, err) } - // TODO: check and confirm that this check is not an issue if len(thresholds.Thresholds) > 0 { - me.metricsWithThresholds[metric.Name] = thresholds + me.metricsWithThresholds[metric.ID] = thresholds } // Mark the metric (and the parent metric, if we're dealing with a // submetric) as observed, so they are shown in the end-of-test summary, - // even if they don't have any metric samples during the test run - - me.trackedMetrics[metric.Name] = &trackedMetric{ - Metric: metric, - sink: metrics.NewSinkByType(metric.Type), - observed: true, - } + // even if they don't have any metric samples during the test run. + me.trackMetric(metric) + me.trackedMetrics[metric.ID].observed = true if metric.Sub != nil { - me.trackedMetrics[metric.Sub.Parent.Name] = &trackedMetric{ - Metric: metric.Sub.Parent, - sink: metrics.NewSinkByType(metric.Sub.Parent.Type), - observed: true, - } + me.trackMetric(metric.Sub.Parent) + me.trackedMetrics[metric.Sub.Parent.ID].observed = true } } @@ -187,15 +183,37 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { if err != nil { return err // shouldn't happen, but ¯\_(ツ)_/¯ } - me.trackedMetrics[expResMetric.Name] = &trackedMetric{ - Metric: expResMetric, - sink: metrics.NewSinkByType(expResMetric.Type), - } + me.trackMetric(expResMetric) } + // TODO: the trackedMetrics slice is fixed now + // to be optimal we could shrink the slice cap + return nil } +func (me *MetricsEngine) trackMetric(m *metrics.Metric) { + tm := &trackedMetric{ + Metric: m, + sink: metrics.NewSinkByType(m.Type), + } + + if me.trackedMetrics == nil { + // the Metric ID starts from one + // so it skips the zero-th position + // to simplify the access operations. + me.trackedMetrics = []*trackedMetric{nil} + } + + if m.ID >= uint64(len(me.trackedMetrics)) { + // expand the slice + me.trackedMetrics = append(me.trackedMetrics, tm) + return + } + + me.trackedMetrics[m.ID] = tm +} + // StartThresholdCalculations spins up a new goroutine to crunch thresholds and // returns a callback that will stop the goroutine and finalizes calculations. func (me *MetricsEngine) StartThresholdCalculations( @@ -261,40 +279,36 @@ func (me *MetricsEngine) evaluateThresholds( ) (breachedThresholds []string, shouldAbort bool) { t := getCurrentTestRunDuration() - computeThresholds := func(metricName string, ths metrics.Thresholds) { - observedMetric, ok := me.trackedMetrics[metricName] - if !ok { - panic(fmt.Sprintf("observed metric %q not found for the threhsolds", metricName)) - } - - observedMetric.m.Lock() - defer observedMetric.m.Unlock() + computeThresholds := func(tm *trackedMetric, ths metrics.Thresholds) { + tm.m.Lock() + defer tm.m.Unlock() // If either the metric has no thresholds defined, or its sinks // are empty, let's ignore its thresholds execution at this point. - if len(ths.Thresholds) == 0 || (ignoreEmptySinks && observedMetric.sink.IsEmpty()) { + if len(ths.Thresholds) == 0 || (ignoreEmptySinks && tm.sink.IsEmpty()) { return } - observedMetric.tainted = false + tm.tainted = false - succ, err := ths.Run(observedMetric.sink, t) + succ, err := ths.Run(tm.sink, t) if err != nil { - me.logger.WithField("metric_name", metricName).WithError(err).Error("Threshold error") + me.logger.WithField("metric_name", tm.Name).WithError(err).Error("Threshold error") return } if succ { return // threshold passed } - breachedThresholds = append(breachedThresholds, metricName) - observedMetric.tainted = true + breachedThresholds = append(breachedThresholds, tm.Name) + tm.tainted = true if ths.Abort { shouldAbort = true } } me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds)) - for m, ths := range me.metricsWithThresholds { - computeThresholds(m, ths) + for mid, ths := range me.metricsWithThresholds { + tracked := me.trackedMetrics[mid] + computeThresholds(tracked, ths) } if len(breachedThresholds) > 0 { @@ -308,7 +322,10 @@ func (me *MetricsEngine) evaluateThresholds( // ObservedMetrics returns all observed metrics. func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric { ometrics := make(map[string]metrics.ObservedMetric, len(me.trackedMetrics)) - for _, tm := range me.trackedMetrics { + + // it skips the first item as it is nil by definition + for i := 1; i < len(me.trackedMetrics); i++ { + tm := me.trackedMetrics[i] tm.m.Lock() if !tm.observed { tm.m.Unlock() @@ -322,11 +339,12 @@ func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric { // ObservedMetricByID returns the observed metric by the provided id. func (me *MetricsEngine) ObservedMetricByID(id string) (metrics.ObservedMetric, bool) { - tm, ok := me.trackedMetrics[id] - if !ok { + m := me.test.Registry.Get(id) + if m == nil { return metrics.ObservedMetric{}, false } + tm := me.trackedMetrics[m.ID] tm.m.Lock() defer tm.m.Unlock() @@ -361,7 +379,7 @@ func (me *MetricsEngine) trackedToObserved(tm *trackedMetric) metrics.ObservedMe Tainted: null.BoolFrom(tm.tainted), // TODO: if null it's required then add to trackedMetric } - definedThs, ok := me.metricsWithThresholds[tm.Name] + definedThs, ok := me.metricsWithThresholds[tm.ID] if !ok || len(definedThs.Thresholds) < 1 { return om } diff --git a/metrics/engine/engine_test.go b/metrics/engine/engine_test.go index 2756a72c708..505f0331241 100644 --- a/metrics/engine/engine_test.go +++ b/metrics/engine/engine_test.go @@ -117,22 +117,26 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) { t.Run(tc.threshold, func(t *testing.T) { t.Parallel() me := newTestMetricsEngine(t) + m1 := me.test.Registry.MustNewMetric("m1", metrics.Counter) + m2 := me.test.Registry.MustNewMetric("m2", metrics.Counter) ths := metrics.NewThresholds([]string{tc.threshold}) require.NoError(t, ths.Parse()) ths.Thresholds[0].AbortOnFail = tc.abortOnFail - me.metricsWithThresholds["m1"] = ths - me.metricsWithThresholds["m2"] = metrics.Thresholds{} + me.metricsWithThresholds[1] = ths + me.metricsWithThresholds[2] = metrics.Thresholds{} csink := &metrics.CounterSink{} csink.Add(metrics.Sample{Value: 6.0}) - me.trackedMetrics["m1"] = &trackedMetric{ - sink: csink, - } - me.trackedMetrics["m2"] = &trackedMetric{ - sink: &metrics.CounterSink{}, - } + me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{ + Metric: m1, + sink: csink, + }) + me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{ + Metric: m2, + sink: &metrics.CounterSink{}, + }) breached, abort := me.evaluateThresholds(false, zeroTestRunDuration) require.Equal(t, tc.abortOnFail, abort) @@ -155,18 +159,18 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) { require.NoError(t, ths.Parse()) ths.Thresholds[0].AbortOnFail = true - me.metricsWithThresholds["m1"] = ths - me.metricsWithThresholds["m2"] = metrics.Thresholds{} + me.metricsWithThresholds[1] = ths + me.metricsWithThresholds[2] = metrics.Thresholds{} - me.trackedMetrics["m1"] = &trackedMetric{ + me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{ Metric: m1, sink: &metrics.CounterSink{}, - } + }) - me.trackedMetrics["m2"] = &trackedMetric{ + me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{ Metric: m2, sink: &metrics.CounterSink{}, - } + }) breached, abort := me.evaluateThresholds(false, zeroTestRunDuration) require.True(t, abort) @@ -191,16 +195,16 @@ func TestMetricsEngineObserveMetricByID(t *testing.T) { require.NoError(t, ths.Parse()) ths.Thresholds[0].AbortOnFail = true - me.metricsWithThresholds["m1"] = metrics.Thresholds{} - me.metricsWithThresholds["m2"] = ths + me.metricsWithThresholds[1] = metrics.Thresholds{} + me.metricsWithThresholds[2] = ths - me.trackedMetrics["m1"] = &trackedMetric{ + me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{ Metric: m1, - } - me.trackedMetrics["m2"] = &trackedMetric{ + }) + me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{ Metric: m2, observed: true, - } + }) ometric, found := me.ObservedMetricByID("m2") require.True(t, found) @@ -208,6 +212,17 @@ func TestMetricsEngineObserveMetricByID(t *testing.T) { assert.Len(t, ometric.Thresholds, 1) } +func TestMetricsEngineTrackMetric(t *testing.T) { + t.Parallel() + + me := newTestMetricsEngine(t) + m := me.test.Registry.MustNewMetric("my_counter", metrics.Counter) + me.trackMetric(m) + require.Len(t, me.trackedMetrics, 2) + assert.Equal(t, m, me.trackedMetrics[1].Metric) + assert.IsType(t, &metrics.CounterSink{}, me.trackedMetrics[1].sink) +} + func newTestMetricsEngine(t *testing.T) MetricsEngine { trs := &lib.TestRunState{ TestPreInitState: &lib.TestPreInitState{ @@ -219,8 +234,8 @@ func newTestMetricsEngine(t *testing.T) MetricsEngine { return MetricsEngine{ logger: trs.Logger, test: trs, - metricsWithThresholds: make(map[string]metrics.Thresholds), - trackedMetrics: make(map[string]*trackedMetric), + metricsWithThresholds: make(map[uint64]metrics.Thresholds), + trackedMetrics: []*trackedMetric{nil}, } } diff --git a/metrics/engine/ingester.go b/metrics/engine/ingester.go index 6ed0cc4143e..8141a26c9b6 100644 --- a/metrics/engine/ingester.go +++ b/metrics/engine/ingester.go @@ -4,7 +4,6 @@ import ( "time" "github.com/sirupsen/logrus" - "go.k6.io/k6/metrics" "go.k6.io/k6/output" ) @@ -60,46 +59,28 @@ func (oi *outputIngester) flushMetrics() { // allow us to have a per-bucket lock, instead of one global one, and it // will allow us to split apart the metric Name and Type from its Sink and // Observed fields... - // - // TODO: And, to further optimize things, if every metric (and sub-metric) had a - // sequential integer ID, we would be able to use a slice for these buckets - // and eliminate the map loopkups altogether! - - samplesByMetric := make(map[*metrics.Metric][]metrics.Sample) for _, sampleContainer := range sampleContainers { samples := sampleContainer.GetSamples() - if len(samples) == 0 { continue } for _, sample := range samples { - m := sample.Metric - samples := samplesByMetric[m] - samples = append(samples, sample) - samplesByMetric[m] = samples + // Mark it as observed so it shows in the end-of-test summary + // and add its value to its own sink. + om := oi.metricsEngine.trackedMetrics[sample.Metric.ID] + om.AddSamples(sample) // and also to the same for any submetrics that match the metric sample - for _, sm := range m.Submetrics { + for _, sm := range sample.Metric.Submetrics { if !sample.Tags.Contains(sm.Tags) { continue } - samples := samplesByMetric[sm.Metric] - samples = append(samples, sample) - samplesByMetric[sm.Metric] = samples - } - } - } - for m, samples := range samplesByMetric { - om, ok := oi.metricsEngine.trackedMetrics[m.Name] - if !ok { - // if they are not pre-defined then - // it is not required to sink them - continue + om := oi.metricsEngine.trackedMetrics[sm.Metric.ID] + om.AddSamples(sample) + } } - - om.AddSamples(samples...) } } diff --git a/metrics/engine/ingester_test.go b/metrics/engine/ingester_test.go index 0726e61adb5..e7304772608 100644 --- a/metrics/engine/ingester_test.go +++ b/metrics/engine/ingester_test.go @@ -15,19 +15,16 @@ func TestIngesterOutputFlushMetrics(t *testing.T) { t.Parallel() piState := newTestPreInitState(t) - testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) - require.NoError(t, err) - ingester := outputIngester{ logger: piState.Logger, metricsEngine: &MetricsEngine{ - trackedMetrics: make(map[string]*trackedMetric), + trackedMetrics: []*trackedMetric{nil}, }, } - ingester.metricsEngine.trackedMetrics["test_metric"] = &trackedMetric{ - Metric: testMetric, - sink: &metrics.TrendSink{}, - } + + testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) + require.NoError(t, err) + ingester.metricsEngine.trackMetric(testMetric) require.NoError(t, ingester.Start()) ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ @@ -40,7 +37,7 @@ func TestIngesterOutputFlushMetrics(t *testing.T) { }}) require.NoError(t, ingester.Stop()) - ometric := ingester.metricsEngine.trackedMetrics["test_metric"] + ometric := ingester.metricsEngine.trackedMetrics[1] require.NotNil(t, ometric) require.NotNil(t, ometric.sink) assert.Equal(t, testMetric, ometric.Metric) @@ -53,29 +50,25 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { t.Parallel() piState := newTestPreInitState(t) - testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Gauge) - require.NoError(t, err) - me := &MetricsEngine{ test: &lib.TestRunState{ TestPreInitState: piState, }, } - // it adds the submetric to the parent + testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Gauge) + require.NoError(t, err) + require.Equal(t, 1, int(testMetric.ID)) + + me.trackMetric(testMetric) + require.Len(t, me.trackedMetrics, 2) + + // it attaches the submetric to the parent testSubMetric, err := me.getThresholdMetricOrSubmetric("test_metric{a:1}") require.NoError(t, err) - me.trackedMetrics = map[string]*trackedMetric{ - "test_metric": { - Metric: testMetric, - sink: metrics.NewSinkByType(testMetric.Type), - }, - "test_metric{a:1}": { - Metric: testSubMetric, - sink: metrics.NewSinkByType(testMetric.Type), - }, - } + me.trackMetric(testSubMetric) + require.Len(t, me.trackedMetrics, 3) ingester := outputIngester{ logger: piState.Logger, @@ -92,17 +85,15 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { }}) require.NoError(t, ingester.Stop()) - require.Len(t, ingester.metricsEngine.trackedMetrics, 2) - // assert the parent has been observed - ometric := ingester.metricsEngine.trackedMetrics["test_metric"] + ometric := ingester.metricsEngine.trackedMetrics[1] require.NotNil(t, ometric) require.NotNil(t, ometric.sink) assert.IsType(t, &metrics.GaugeSink{}, ometric.sink) assert.Equal(t, 21.0, ometric.sink.(*metrics.GaugeSink).Value) // assert the submetric has been observed - ometric = ingester.metricsEngine.trackedMetrics["test_metric{a:1}"] + ometric = ingester.metricsEngine.trackedMetrics[2] require.NotNil(t, ometric) require.NotNil(t, ometric.sink) require.NotNil(t, ometric.Metric.Sub) @@ -119,6 +110,5 @@ func newTestPreInitState(tb testing.TB) *lib.TestPreInitState { Logger: logger, RuntimeOptions: lib.RuntimeOptions{}, Registry: reg, - BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg), } } diff --git a/metrics/metric.go b/metrics/metric.go index f82a44f6590..a25f8f68eb7 100644 --- a/metrics/metric.go +++ b/metrics/metric.go @@ -10,7 +10,9 @@ import ( // A Metric defines the shape of a set of data. type Metric struct { - registry *Registry `json:"-"` + registry *Registry `json:"-"` + + ID uint64 `json:"-"` Name string `json:"name"` Type MetricType `json:"type"` Contains ValueType `json:"contains"` diff --git a/metrics/registry.go b/metrics/registry.go index fe3c40ae707..154d5eec29f 100644 --- a/metrics/registry.go +++ b/metrics/registry.go @@ -11,6 +11,7 @@ import ( // Registry is what can create metrics type Registry struct { metrics map[string]*Metric + ix uint64 l sync.RWMutex rootTagSet *atlas.Node @@ -94,8 +95,10 @@ func (r *Registry) newMetric(name string, mt MetricType, vt ...ValueType) *Metri valueType = vt[0] } + r.ix++ return &Metric{ registry: r, + ID: r.ix, Name: name, Type: mt, Contains: valueType,