Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add a failure type label to failure metrics in storage package (#60)
Browse files Browse the repository at this point in the history
Adds failure type as a tag on head, read and write failures emitted by storage package.
  • Loading branch information
EngHabu authored Mar 20, 2020
1 parent d79f354 commit 0e9ca38
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 46 deletions.
11 changes: 11 additions & 0 deletions contextutils/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ var logKeys = []Key{
ResourceVersionKey,
}

// MetricKeysFromStrings is a convenience method to convert a slice of strings into a slice of Keys
func MetricKeysFromStrings(keys []string) []Key {
res := make([]Key, 0, len(keys))

for _, k := range keys {
res = append(res, Key(k))
}

return res
}

// Gets a new context with the resource version set.
func WithResourceVersion(ctx context.Context, resourceVersion string) context.Context {
return context.WithValue(ctx, ResourceVersionKey, resourceVersion)
Expand Down
14 changes: 10 additions & 4 deletions promutils/labeled/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Counter struct {
*prometheus.CounterVec

prometheus.Counter
additionalLabels []contextutils.Key
}

// Inc increments the counter by 1. Use Add to increment it by arbitrary non-negative values. The data point will be
Expand All @@ -33,7 +34,7 @@ func (c Counter) Inc(ctx context.Context) {
// Add adds the given value to the counter. It panics if the value is < 0.. The data point will be labeled with values
// from context. See labeled.SetMetricsKeys for information about to configure that.
func (c Counter) Add(ctx context.Context, v float64) {
counter, err := c.CounterVec.GetMetricWith(contextutils.Values(ctx, metricKeys...))
counter, err := c.CounterVec.GetMetricWith(contextutils.Values(ctx, append(metricKeys, c.additionalLabels...)...))
if err != nil {
panic(err.Error())
}
Expand All @@ -51,15 +52,20 @@ func NewCounter(name, description string, scope promutils.Scope, opts ...MetricO
panic(ErrNeverSet)
}

c := Counter{
CounterVec: scope.MustNewCounterVec(name, description, metricStringKeys...),
}
c := Counter{}

for _, opt := range opts {
if _, emitUnlabeledMetric := opt.(EmitUnlabeledMetricOption); emitUnlabeledMetric {
c.Counter = scope.MustNewCounter(GetUnlabeledMetricName(name), description)
} else if additionalLabels, casted := opt.(AdditionalLabelsOption); casted {
c.CounterVec = scope.MustNewCounterVec(name, description, append(metricStringKeys, additionalLabels.Labels...)...)
c.additionalLabels = contextutils.MetricKeysFromStrings(additionalLabels.Labels)
}
}

if c.CounterVec == nil {
c.CounterVec = scope.MustNewCounterVec(name, description, metricStringKeys...)
}

return c
}
9 changes: 9 additions & 0 deletions promutils/labeled/metric_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ type EmitUnlabeledMetricOption struct {
func (EmitUnlabeledMetricOption) isMetricOption() {}

var EmitUnlabeledMetric = EmitUnlabeledMetricOption{}

// AdditionalLabelsOption instructs the labeled metric to expect additional labels scoped for this just this metric
// in the context passed.
type AdditionalLabelsOption struct {
// A collection of labels to look for in the passed context.
Labels []string
}

func (AdditionalLabelsOption) isMetricOption() {}
16 changes: 12 additions & 4 deletions promutils/labeled/stopwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type StopWatch struct {
// through a flag in the constructor, we initialize this additional untagged stopwatch to compute percentiles
// across tags.
promutils.StopWatch

additionalLabels []contextutils.Key
}

// Start creates a new Instance of the StopWatch called a Timer that is closeable/stoppable.
Expand Down Expand Up @@ -46,7 +48,7 @@ func (c StopWatch) Start(ctx context.Context) Timer {
// Observes specified duration between the start and end time. The data point will be labeled with values from context.
// See labeled.SetMetricsKeys for information about to configure that.
func (c StopWatch) Observe(ctx context.Context, start, end time.Time) {
w, err := c.StopWatchVec.GetMetricWith(contextutils.Values(ctx, metricKeys...))
w, err := c.StopWatchVec.GetMetricWith(contextutils.Values(ctx, append(metricKeys, c.additionalLabels...)...))
if err != nil {
panic(err.Error())
}
Expand All @@ -73,15 +75,21 @@ func NewStopWatch(name, description string, scale time.Duration, scope promutils
panic(ErrNeverSet)
}

sw := StopWatch{
StopWatchVec: scope.MustNewStopWatchVec(name, description, scale, metricStringKeys...),
}
sw := StopWatch{}

for _, opt := range opts {
if _, emitUnableMetric := opt.(EmitUnlabeledMetricOption); emitUnableMetric {
sw.StopWatch = scope.MustNewStopWatch(GetUnlabeledMetricName(name), description, scale)
} else if additionalLabels, casted := opt.(AdditionalLabelsOption); casted {
sw.StopWatchVec = scope.MustNewStopWatchVec(name, description, scale,
append(metricStringKeys, additionalLabels.Labels...)...)
sw.additionalLabels = contextutils.MetricKeysFromStrings(additionalLabels.Labels)
}
}

if sw.StopWatchVec == nil {
sw.StopWatchVec = scope.MustNewStopWatchVec(name, description, scale, metricStringKeys...)
}

return sw
}
74 changes: 42 additions & 32 deletions storage/stow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,34 @@ import (
"io"
"time"

"github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/contextutils"

"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/prometheus/client_golang/prometheus"
"github.com/lyft/flytestdlib/errors"

"github.com/lyft/flytestdlib/promutils"

"github.com/graymeta/stow"
errs "github.com/pkg/errors"
)

const (
FailureTypeLabel contextutils.Key = "failure_type"
)

type stowMetrics struct {
BadReference prometheus.Counter
BadContainer prometheus.Counter
BadReference labeled.Counter
BadContainer labeled.Counter

HeadFailure prometheus.Counter
HeadLatency promutils.StopWatch
HeadFailure labeled.Counter
HeadLatency labeled.StopWatch

ReadFailure prometheus.Counter
ReadOpenLatency promutils.StopWatch
ReadFailure labeled.Counter
ReadOpenLatency labeled.StopWatch

WriteFailure prometheus.Counter
WriteLatency promutils.StopWatch
WriteFailure labeled.Counter
WriteLatency labeled.StopWatch
}

// Implements DataStore to talk to stow location store.
Expand All @@ -50,9 +56,9 @@ func (s StowMetadata) Exists() bool {
return s.exists
}

func (s *StowStore) getContainer(container string) (c stow.Container, err error) {
func (s *StowStore) getContainer(ctx context.Context, container string) (c stow.Container, err error) {
if s.Container.Name() != container {
s.metrics.BadContainer.Inc()
s.metrics.BadContainer.Inc(ctx)
return nil, errs.Wrapf(stow.ErrNotFound, "Conf container:%v != Passed Container:%v", s.Container.Name(), container)
}

Expand All @@ -62,16 +68,16 @@ func (s *StowStore) getContainer(container string) (c stow.Container, err error)
func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata, error) {
_, c, k, err := reference.Split()
if err != nil {
s.metrics.BadReference.Inc()
s.metrics.BadReference.Inc(ctx)
return nil, err
}

container, err := s.getContainer(c)
container, err := s.getContainer(ctx, c)
if err != nil {
return nil, err
}

t := s.metrics.HeadLatency.Start()
t := s.metrics.HeadLatency.Start(ctx)
item, err := container.Item(k)
if err == nil {
if _, err = item.Metadata(); err == nil {
Expand All @@ -85,29 +91,31 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata
}
}
}
s.metrics.HeadFailure.Inc()

if IsNotFound(err) {
return StowMetadata{exists: false}, nil
}

incFailureCounterForError(ctx, s.metrics.HeadFailure, err)
return StowMetadata{exists: false}, errs.Wrapf(err, "path:%v", k)
}

func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
_, c, k, err := reference.Split()
if err != nil {
s.metrics.BadReference.Inc()
s.metrics.BadReference.Inc(ctx)
return nil, err
}

container, err := s.getContainer(c)
container, err := s.getContainer(ctx, c)
if err != nil {
return nil, err
}

t := s.metrics.ReadOpenLatency.Start()
t := s.metrics.ReadOpenLatency.Start(ctx)
item, err := container.Item(k)
if err != nil {
s.metrics.ReadFailure.Inc()
incFailureCounterForError(ctx, s.metrics.ReadFailure, err)
return nil, err
}
t.Stop()
Expand All @@ -127,21 +135,22 @@ func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.Re
func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error {
_, c, k, err := reference.Split()
if err != nil {
s.metrics.BadReference.Inc()
s.metrics.BadReference.Inc(ctx)
return err
}

container, err := s.getContainer(c)
container, err := s.getContainer(ctx, c)
if err != nil {
return err
}

t := s.metrics.WriteLatency.Start()
t := s.metrics.WriteLatency.Start(ctx)
_, err = container.Put(k, raw, size, opts.Metadata)
if err != nil {
s.metrics.WriteFailure.Inc()
incFailureCounterForError(ctx, s.metrics.WriteFailure, err)
return errs.Wrapf(err, "Failed to write data [%vb] to path [%v].", size, k)
}

t.Stop()

return nil
Expand All @@ -152,21 +161,22 @@ func (s *StowStore) GetBaseContainerFQN(ctx context.Context) DataReference {
}

func NewStowRawStore(containerBaseFQN DataReference, container stow.Container, metricsScope promutils.Scope) (*StowStore, error) {
failureTypeOption := labeled.AdditionalLabelsOption{Labels: []string{FailureTypeLabel.String()}}
self := &StowStore{
Container: container,
containerBaseFQN: containerBaseFQN,
metrics: &stowMetrics{
BadReference: metricsScope.MustNewCounter("bad_key", "Indicates the provided storage reference/key is incorrectly formatted"),
BadContainer: metricsScope.MustNewCounter("bad_container", "Indicates request for a container that has not been initialized"),
BadReference: labeled.NewCounter("bad_key", "Indicates the provided storage reference/key is incorrectly formatted", metricsScope, labeled.EmitUnlabeledMetric),
BadContainer: labeled.NewCounter("bad_container", "Indicates request for a container that has not been initialized", metricsScope, labeled.EmitUnlabeledMetric),

HeadFailure: metricsScope.MustNewCounter("head_failure", "Indicates failure in HEAD for a given reference"),
HeadLatency: metricsScope.MustNewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond),
HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", metricsScope, labeled.EmitUnlabeledMetric),
HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, metricsScope, labeled.EmitUnlabeledMetric),

ReadFailure: metricsScope.MustNewCounter("read_failure", "Indicates failure in GET for a given reference"),
ReadOpenLatency: metricsScope.MustNewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond),
ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", metricsScope, labeled.EmitUnlabeledMetric, failureTypeOption),
ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, metricsScope, labeled.EmitUnlabeledMetric),

WriteFailure: metricsScope.MustNewCounter("write_failure", "Indicates failure in storing/PUT for a given reference"),
WriteLatency: metricsScope.MustNewStopWatch("write", "Time to write an object irrespective of size", time.Millisecond),
WriteFailure: labeled.NewCounter("write_failure", "Indicates failure in storing/PUT for a given reference", metricsScope, labeled.EmitUnlabeledMetric, failureTypeOption),
WriteLatency: labeled.NewStopWatch("write", "Time to write an object irrespective of size", time.Millisecond, metricsScope, labeled.EmitUnlabeledMetric),
},
}

Expand Down
27 changes: 21 additions & 6 deletions storage/utils.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package storage

import (
"context"
"os"

errors2 "github.com/lyft/flytestdlib/errors"
stdErrs "github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/graymeta/stow"
"github.com/pkg/errors"
)

var (
ErrExceedsLimit errors2.ErrorCode = "LIMIT_EXCEEDED"
ErrFailedToWriteCache errors2.ErrorCode = "CACHE_WRITE_FAILED"
ErrExceedsLimit stdErrs.ErrorCode = "LIMIT_EXCEEDED"
ErrFailedToWriteCache stdErrs.ErrorCode = "CACHE_WRITE_FAILED"
)

const (
genericFailureTypeLabel = "Generic"
)

// Gets a value indicating whether the underlying error is a Not Found error.
Expand All @@ -20,7 +26,7 @@ func IsNotFound(err error) bool {
return true
}

if errors2.IsCausedByError(err, stow.ErrNotFound) {
if stdErrs.IsCausedByError(err, stow.ErrNotFound) {
return true
}

Expand All @@ -38,11 +44,11 @@ func IsExists(err error) bool {

// Gets a value indicating whether the root cause of error is a "limit exceeded" error.
func IsExceedsLimit(err error) bool {
return errors2.IsCausedBy(err, ErrExceedsLimit)
return stdErrs.IsCausedBy(err, ErrExceedsLimit)
}

func IsFailedWriteToCache(err error) bool {
return errors2.IsCausedBy(err, ErrFailedToWriteCache)
return stdErrs.IsCausedBy(err, ErrFailedToWriteCache)
}

func MapStrings(mapper func(string) string, strings ...string) []string {
Expand All @@ -56,3 +62,12 @@ func MapStrings(mapper func(string) string, strings ...string) []string {

return strings
}

func incFailureCounterForError(ctx context.Context, counter labeled.Counter, err error) {
errCode, found := stdErrs.GetErrorCode(err)
if found {
counter.Inc(context.WithValue(ctx, FailureTypeLabel, errCode))
} else {
counter.Inc(context.WithValue(ctx, FailureTypeLabel, genericFailureTypeLabel))
}
}

0 comments on commit 0e9ca38

Please sign in to comment.