Skip to content

Commit

Permalink
Use an incremental integer as ID for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Feb 20, 2023
1 parent 5d8fd12 commit db36955
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 126 deletions.
114 changes: 66 additions & 48 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type MetricsEngine struct {
// they can be both top-level metrics or sub-metrics
//
// TODO: remove the tracked map using the sequence number
metricsWithThresholds map[string]metrics.Thresholds
trackedMetrics map[string]*trackedMetric
metricsWithThresholds map[uint64]metrics.Thresholds
trackedMetrics []*trackedMetric

breachedThresholdsCount uint32
}
Expand All @@ -65,23 +65,27 @@ func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
me := &MetricsEngine{
test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
metricsWithThresholds: make(map[string]metrics.Thresholds),
trackedMetrics: make(map[string]*trackedMetric),
metricsWithThresholds: make(map[uint64]metrics.Thresholds),
}

if me.test.RuntimeOptions.NoSummary.Bool &&
me.test.RuntimeOptions.NoThresholds.Bool {
return me, nil
}

for _, registered := range me.test.Registry.All() {
typ := registered.Type
me.trackedMetrics[registered.Name] = &trackedMetric{
Metric: registered,
sink: metrics.NewSinkByType(typ),
}
// It adds all the registered metrics as tracked
// the custom metrics are also added because they have
// been seen and registered during the initEnv run
// that must run before this constructor is called.
registered := me.test.Registry.All()
me.trackedMetrics = make([]*trackedMetric, len(registered)+1)
for _, mreg := range registered {
me.trackMetric(mreg)
}

// It adds register and tracks all the metrics defined by the thresholds.
// They are also marked as observed because
// the summary wants them also if they didn't receive any sample.
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,27 +160,19 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
return fmt.Errorf("invalid metric '%s' in threshold definitions: %w", metricName, err)
}

// TODO: check and confirm that this check is not an issue
if len(thresholds.Thresholds) > 0 {
me.metricsWithThresholds[metric.Name] = thresholds
me.metricsWithThresholds[metric.ID] = thresholds
}

// Mark the metric (and the parent metric, if we're dealing with a
// submetric) as observed, so they are shown in the end-of-test summary,
// even if they don't have any metric samples during the test run

me.trackedMetrics[metric.Name] = &trackedMetric{
Metric: metric,
sink: metrics.NewSinkByType(metric.Type),
observed: true,
}
// even if they don't have any metric samples during the test run.
me.trackMetric(metric)
me.trackedMetrics[metric.ID].observed = true

if metric.Sub != nil {
me.trackedMetrics[metric.Sub.Parent.Name] = &trackedMetric{
Metric: metric.Sub.Parent,
sink: metrics.NewSinkByType(metric.Sub.Parent.Type),
observed: true,
}
me.trackMetric(metric.Sub.Parent)
me.trackedMetrics[metric.Sub.Parent.ID].observed = true
}
}

Expand All @@ -187,15 +183,37 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
}
me.trackedMetrics[expResMetric.Name] = &trackedMetric{
Metric: expResMetric,
sink: metrics.NewSinkByType(expResMetric.Type),
}
me.trackMetric(expResMetric)
}

// TODO: the trackedMetrics slice is fixed now
// to be optimal we could shrink the slice cap

return nil
}

func (me *MetricsEngine) trackMetric(m *metrics.Metric) {
tm := &trackedMetric{
Metric: m,
sink: metrics.NewSinkByType(m.Type),
}

if me.trackedMetrics == nil {
// the Metric ID starts from one
// so it skips the zero-th position
// to simplify the access operations.
me.trackedMetrics = []*trackedMetric{nil}
}

if m.ID >= uint64(len(me.trackedMetrics)) {
// expand the slice
me.trackedMetrics = append(me.trackedMetrics, tm)
return
}

me.trackedMetrics[m.ID] = tm
}

// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(
Expand Down Expand Up @@ -261,40 +279,36 @@ func (me *MetricsEngine) evaluateThresholds(
) (breachedThresholds []string, shouldAbort bool) {
t := getCurrentTestRunDuration()

computeThresholds := func(metricName string, ths metrics.Thresholds) {
observedMetric, ok := me.trackedMetrics[metricName]
if !ok {
panic(fmt.Sprintf("observed metric %q not found for the threhsolds", metricName))
}

observedMetric.m.Lock()
defer observedMetric.m.Unlock()
computeThresholds := func(tm *trackedMetric, ths metrics.Thresholds) {
tm.m.Lock()
defer tm.m.Unlock()

// If either the metric has no thresholds defined, or its sinks
// are empty, let's ignore its thresholds execution at this point.
if len(ths.Thresholds) == 0 || (ignoreEmptySinks && observedMetric.sink.IsEmpty()) {
if len(ths.Thresholds) == 0 || (ignoreEmptySinks && tm.sink.IsEmpty()) {
return
}
observedMetric.tainted = false
tm.tainted = false

succ, err := ths.Run(observedMetric.sink, t)
succ, err := ths.Run(tm.sink, t)
if err != nil {
me.logger.WithField("metric_name", metricName).WithError(err).Error("Threshold error")
me.logger.WithField("metric_name", tm.Name).WithError(err).Error("Threshold error")
return
}
if succ {
return // threshold passed
}
breachedThresholds = append(breachedThresholds, metricName)
observedMetric.tainted = true
breachedThresholds = append(breachedThresholds, tm.Name)
tm.tainted = true
if ths.Abort {
shouldAbort = true
}
}

me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds))
for m, ths := range me.metricsWithThresholds {
computeThresholds(m, ths)
for mid, ths := range me.metricsWithThresholds {
tracked := me.trackedMetrics[mid]
computeThresholds(tracked, ths)
}

if len(breachedThresholds) > 0 {
Expand All @@ -308,7 +322,10 @@ func (me *MetricsEngine) evaluateThresholds(
// ObservedMetrics returns all observed metrics.
func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric {
ometrics := make(map[string]metrics.ObservedMetric, len(me.trackedMetrics))
for _, tm := range me.trackedMetrics {

// it skips the first item as it is nil by definition
for i := 1; i < len(me.trackedMetrics); i++ {
tm := me.trackedMetrics[i]
tm.m.Lock()
if !tm.observed {
tm.m.Unlock()
Expand All @@ -322,11 +339,12 @@ func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric {

// ObservedMetricByID returns the observed metric by the provided id.
func (me *MetricsEngine) ObservedMetricByID(id string) (metrics.ObservedMetric, bool) {
tm, ok := me.trackedMetrics[id]
if !ok {
m := me.test.Registry.Get(id)
if m == nil {
return metrics.ObservedMetric{}, false
}

tm := me.trackedMetrics[m.ID]
tm.m.Lock()
defer tm.m.Unlock()

Expand Down Expand Up @@ -361,7 +379,7 @@ func (me *MetricsEngine) trackedToObserved(tm *trackedMetric) metrics.ObservedMe
Tainted: null.BoolFrom(tm.tainted), // TODO: if null it's required then add to trackedMetric
}

definedThs, ok := me.metricsWithThresholds[tm.Name]
definedThs, ok := me.metricsWithThresholds[tm.ID]
if !ok || len(definedThs.Thresholds) < 1 {
return om
}
Expand Down
59 changes: 37 additions & 22 deletions metrics/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,26 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) {
t.Run(tc.threshold, func(t *testing.T) {
t.Parallel()
me := newTestMetricsEngine(t)
m1 := me.test.Registry.MustNewMetric("m1", metrics.Counter)
m2 := me.test.Registry.MustNewMetric("m2", metrics.Counter)

ths := metrics.NewThresholds([]string{tc.threshold})
require.NoError(t, ths.Parse())
ths.Thresholds[0].AbortOnFail = tc.abortOnFail

me.metricsWithThresholds["m1"] = ths
me.metricsWithThresholds["m2"] = metrics.Thresholds{}
me.metricsWithThresholds[1] = ths
me.metricsWithThresholds[2] = metrics.Thresholds{}

csink := &metrics.CounterSink{}
csink.Add(metrics.Sample{Value: 6.0})
me.trackedMetrics["m1"] = &trackedMetric{
sink: csink,
}
me.trackedMetrics["m2"] = &trackedMetric{
sink: &metrics.CounterSink{},
}
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m1,
sink: csink,
})
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m2,
sink: &metrics.CounterSink{},
})

breached, abort := me.evaluateThresholds(false, zeroTestRunDuration)
require.Equal(t, tc.abortOnFail, abort)
Expand All @@ -155,18 +159,18 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {
require.NoError(t, ths.Parse())
ths.Thresholds[0].AbortOnFail = true

me.metricsWithThresholds["m1"] = ths
me.metricsWithThresholds["m2"] = metrics.Thresholds{}
me.metricsWithThresholds[1] = ths
me.metricsWithThresholds[2] = metrics.Thresholds{}

me.trackedMetrics["m1"] = &trackedMetric{
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m1,
sink: &metrics.CounterSink{},
}
})

me.trackedMetrics["m2"] = &trackedMetric{
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m2,
sink: &metrics.CounterSink{},
}
})

breached, abort := me.evaluateThresholds(false, zeroTestRunDuration)
require.True(t, abort)
Expand All @@ -191,23 +195,34 @@ func TestMetricsEngineObserveMetricByID(t *testing.T) {
require.NoError(t, ths.Parse())
ths.Thresholds[0].AbortOnFail = true

me.metricsWithThresholds["m1"] = metrics.Thresholds{}
me.metricsWithThresholds["m2"] = ths
me.metricsWithThresholds[1] = metrics.Thresholds{}
me.metricsWithThresholds[2] = ths

me.trackedMetrics["m1"] = &trackedMetric{
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m1,
}
me.trackedMetrics["m2"] = &trackedMetric{
})
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m2,
observed: true,
}
})

ometric, found := me.ObservedMetricByID("m2")
require.True(t, found)
assert.Equal(t, m2, ometric.Metric)
assert.Len(t, ometric.Thresholds, 1)
}

func TestMetricsEngineTrackMetric(t *testing.T) {
t.Parallel()

me := newTestMetricsEngine(t)
m := me.test.Registry.MustNewMetric("my_counter", metrics.Counter)
me.trackMetric(m)
require.Len(t, me.trackedMetrics, 2)
assert.Equal(t, m, me.trackedMetrics[1].Metric)
assert.IsType(t, &metrics.CounterSink{}, me.trackedMetrics[1].sink)
}

func newTestMetricsEngine(t *testing.T) MetricsEngine {
trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Expand All @@ -219,8 +234,8 @@ func newTestMetricsEngine(t *testing.T) MetricsEngine {
return MetricsEngine{
logger: trs.Logger,
test: trs,
metricsWithThresholds: make(map[string]metrics.Thresholds),
trackedMetrics: make(map[string]*trackedMetric),
metricsWithThresholds: make(map[uint64]metrics.Thresholds),
trackedMetrics: []*trackedMetric{nil},
}
}

Expand Down
35 changes: 8 additions & 27 deletions metrics/engine/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
)

Expand Down Expand Up @@ -60,46 +59,28 @@ func (oi *outputIngester) flushMetrics() {
// allow us to have a per-bucket lock, instead of one global one, and it
// will allow us to split apart the metric Name and Type from its Sink and
// Observed fields...
//
// TODO: And, to further optimize things, if every metric (and sub-metric) had a
// sequential integer ID, we would be able to use a slice for these buckets
// and eliminate the map loopkups altogether!

samplesByMetric := make(map[*metrics.Metric][]metrics.Sample)

for _, sampleContainer := range sampleContainers {
samples := sampleContainer.GetSamples()

if len(samples) == 0 {
continue
}

for _, sample := range samples {
m := sample.Metric
samples := samplesByMetric[m]
samples = append(samples, sample)
samplesByMetric[m] = samples
// Mark it as observed so it shows in the end-of-test summary
// and add its value to its own sink.
om := oi.metricsEngine.trackedMetrics[sample.Metric.ID]
om.AddSamples(sample)

// and also to the same for any submetrics that match the metric sample
for _, sm := range m.Submetrics {
for _, sm := range sample.Metric.Submetrics {
if !sample.Tags.Contains(sm.Tags) {
continue
}
samples := samplesByMetric[sm.Metric]
samples = append(samples, sample)
samplesByMetric[sm.Metric] = samples
}
}
}

for m, samples := range samplesByMetric {
om, ok := oi.metricsEngine.trackedMetrics[m.Name]
if !ok {
// if they are not pre-defined then
// it is not required to sink them
continue
om := oi.metricsEngine.trackedMetrics[sm.Metric.ID]
om.AddSamples(sample)
}
}

om.AddSamples(samples...)
}
}
Loading

0 comments on commit db36955

Please sign in to comment.