Skip to content

Commit

Permalink
Compute OTel metrics (#2744)
Browse files Browse the repository at this point in the history
* Compute OTel metrics

GCSFuse metrics have been implemented in OTel. This means that one can
now use the available OTel exporters to export these metrics.
  • Loading branch information
kislaykishore authored Dec 16, 2024
1 parent 78b69e8 commit 840605f
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cmd/legacy_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ func Mount(newConfig *cfg.Config, bucketName, mountPoint string) (err error) {
if cfg.IsMetricsEnabled(&newConfig.Metrics) {
if newConfig.Metrics.EnableOtel {
metricExporterShutdownFn = monitor.SetupOTelMetricExporters(ctx, newConfig)
if metricHandle, err = common.NewOTelMetrics(); err != nil {
metricHandle = common.NewNoopMetrics()
}
} else {
metricExporterShutdownFn = monitor.SetupOpenCensusExporters(newConfig)
if metricHandle, err = common.NewOCMetrics(); err != nil {
Expand Down
19 changes: 19 additions & 0 deletions common/oc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
Expand Down Expand Up @@ -72,6 +74,23 @@ func attrsToTags(attrs []MetricAttr) []tag.Mutator {
}
return mutators
}

func attrsToRecordOption(attrs []MetricAttr) []metric.RecordOption {
otelOptions := make([]metric.RecordOption, 0, len(attrs))
for _, attr := range attrs {
otelOptions = append(otelOptions, metric.WithAttributes(attribute.String(attr.Key, attr.Value)))
}
return otelOptions
}

func attrsToAddOption(attrs []MetricAttr) []metric.AddOption {
otelOptions := make([]metric.AddOption, 0, len(attrs))
for _, attr := range attrs {
otelOptions = append(otelOptions, metric.WithAttributes(attribute.String(attr.Key, attr.Value)))
}
return otelOptions
}

func (o *ocMetrics) GCSReadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.gcsReadBytesCount, inc, attrs, "GCS read bytes count")
}
Expand Down
139 changes: 139 additions & 0 deletions common/otel_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"
"errors"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

var (
fsOpsMeter = otel.Meter("fs_op")
gcsMeter = otel.Meter("gcs")
fileCacheMeter = otel.Meter("file_cache")
)

// otelMetrics maintains the list of all metrics computed in GCSFuse.
type otelMetrics struct {
fsOpsCount metric.Int64Counter
fsOpsErrorCount metric.Int64Counter
fsOpsLatency metric.Float64Histogram

gcsReadCount metric.Int64Counter
gcsReadBytesCount metric.Int64Counter
gcsReaderCount metric.Int64Counter
gcsRequestCount metric.Int64Counter
gcsRequestLatency metric.Float64Histogram
gcsDownloadBytesCount metric.Int64Counter

fileCacheReadCount metric.Int64Counter
fileCacheReadBytesCount metric.Int64Counter
fileCacheReadLatency metric.Float64Histogram
}

func (o *otelMetrics) GCSReadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsReadBytesCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) GCSReaderCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsReaderCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) GCSRequestCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsRequestCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) GCSRequestLatency(ctx context.Context, value float64, attrs []MetricAttr) {
o.gcsRequestLatency.Record(ctx, value, attrsToRecordOption(attrs)...)
}

func (o *otelMetrics) GCSReadCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsReadCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) GCSDownloadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.gcsDownloadBytesCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) OpsCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fsOpsCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) OpsLatency(ctx context.Context, value float64, attrs []MetricAttr) {
o.fsOpsLatency.Record(ctx, value, attrsToRecordOption(attrs)...)
}

func (o *otelMetrics) OpsErrorCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fsOpsErrorCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) FileCacheReadCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fileCacheReadCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) FileCacheReadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
o.fileCacheReadBytesCount.Add(ctx, inc, attrsToAddOption(attrs)...)
}

func (o *otelMetrics) FileCacheReadLatency(ctx context.Context, value float64, attrs []MetricAttr) {
o.fileCacheReadLatency.Record(ctx, value, attrsToRecordOption(attrs)...)
}

func NewOTelMetrics() (MetricHandle, error) {
fsOpsCount, err1 := fsOpsMeter.Int64Counter("fs/ops_count", metric.WithDescription("The number of ops processed by the file system."))
fsOpsLatency, err2 := fsOpsMeter.Float64Histogram("fs/ops_latency", metric.WithDescription("The latency of a file system operation."), metric.WithUnit("us"),
defaultLatencyDistribution)
fsOpsErrorCount, err3 := fsOpsMeter.Int64Counter("fs/ops_error_count", metric.WithDescription("The number of errors generated by file system operation."))

gcsReadCount, err4 := gcsMeter.Int64Counter("gcs/read_count", metric.WithDescription("Specifies the number of gcs reads made along with type - Sequential/Random"))
gcsDownloadBytesCount, err5 := gcsMeter.Int64Counter("gcs/download_bytes_count",
metric.WithDescription("The cumulative number of bytes downloaded from GCS along with type - Sequential/Random"),
metric.WithUnit("By"))
gcsReadBytesCount, err6 := gcsMeter.Int64Counter("gcs/read_bytes_count", metric.WithDescription("The number of bytes read from GCS objects."), metric.WithUnit("By"))
gcsReaderCount, err7 := gcsMeter.Int64Counter("gcs/reader_count", metric.WithDescription("The number of GCS object readers opened or closed."))
gcsRequestCount, err8 := gcsMeter.Int64Counter("gcs/request_count", metric.WithDescription("The cumulative number of GCS requests processed."))
gcsRequestLatency, err9 := gcsMeter.Float64Histogram("gcs/request_latency", metric.WithDescription("The latency of a GCS request."), metric.WithUnit("ms"))

fileCacheReadCount, err10 := fileCacheMeter.Int64Counter("file_cache/read_count",
metric.WithDescription("Specifies the number of read requests made via file cache along with type - Sequential/Random and cache hit - true/false"))
fileCacheReadBytesCount, err11 := fileCacheMeter.Int64Counter("file_cache/read_bytes_count",
metric.WithDescription("The cumulative number of bytes read from file cache along with read type - Sequential/Random"),
metric.WithUnit("By"))
fileCacheReadLatency, err12 := fileCacheMeter.Float64Histogram("file_cache/read_latencies",
metric.WithDescription("Latency of read from file cache along with cache hit - true/false"),
metric.WithUnit("us"),
defaultLatencyDistribution)

if err := errors.Join(err1, err2, err3, err4, err5, err6, err7, err8, err9, err10, err11, err12); err != nil {
return nil, err
}
return &otelMetrics{
fsOpsCount: fsOpsCount,
fsOpsErrorCount: fsOpsErrorCount,
fsOpsLatency: fsOpsLatency,
gcsReadCount: gcsReadCount,
gcsReadBytesCount: gcsReadBytesCount,
gcsReaderCount: gcsReaderCount,
gcsRequestCount: gcsRequestCount,
gcsRequestLatency: gcsRequestLatency,
gcsDownloadBytesCount: gcsDownloadBytesCount,
fileCacheReadCount: fileCacheReadCount,
fileCacheReadBytesCount: fileCacheReadBytesCount,
fileCacheReadLatency: fileCacheReadLatency,
}, nil
}
6 changes: 6 additions & 0 deletions common/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/otel/metric"
)

type ShutdownFn func(ctx context.Context) error

// The default time buckets for latency metrics.
// The unit can however change for different units i.e. for one metric the unit could be microseconds and for another it could be milliseconds.
var defaultLatencyDistribution = metric.WithExplicitBucketBoundaries(1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)

// JoinShutdownFunc combines the provided shutdown functions into a single function.
func JoinShutdownFunc(shutdownFns ...ShutdownFn) ShutdownFn {
return func(ctx context.Context) error {
Expand Down
16 changes: 16 additions & 0 deletions internal/monitor/otelexporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
const serviceName = "gcsfuse"
const cloudMonitoringMetricPrefix = "custom.googleapis.com/gcsfuse/"

var allowedMetricPrefixes = []string{"fs/", "gcs/", "file_cache/"}

// SetupOTelMetricExporters sets up the metrics exporters
func SetupOTelMetricExporters(ctx context.Context, c *cfg.Config) (shutdownFn common.ShutdownFn) {
shutdownFns := make([]common.ShutdownFn, 0)
Expand All @@ -59,6 +61,8 @@ func SetupOTelMetricExporters(ctx context.Context, c *cfg.Config) (shutdownFn co
options = append(options, metric.WithResource(res))
}

options = append(options, metric.WithView(dropDisallowedMetricsView))

meterProvider := metric.NewMeterProvider(options...)
shutdownFns = append(shutdownFns, meterProvider.Shutdown)

Expand All @@ -67,6 +71,18 @@ func SetupOTelMetricExporters(ctx context.Context, c *cfg.Config) (shutdownFn co
return common.JoinShutdownFunc(shutdownFns...)
}

// dropUnwantedMetricsView is an OTel View that drops the metrics that don't match the allowed prefixes.
func dropDisallowedMetricsView(i metric.Instrument) (metric.Stream, bool) {
s := metric.Stream{Name: i.Name, Description: i.Description, Unit: i.Unit}
for _, prefix := range allowedMetricPrefixes {
if strings.HasPrefix(i.Name, prefix) {
return s, true
}
}
s.Aggregation = metric.AggregationDrop{}
return s, true
}

func setupCloudMonitoring(secs int64) ([]metric.Option, common.ShutdownFn) {
if secs <= 0 {
return nil, nil
Expand Down
Loading

0 comments on commit 840605f

Please sign in to comment.