Skip to content

Expose HTTP/gRPC metrics to OpenTelemetry as well #11564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/beater/api/mux.go
Original file line number Diff line number Diff line change
@@ -260,7 +260,7 @@ func apmMiddleware(m map[request.ResultID]*monitoring.Int) []middleware.Middlewa
middleware.LogMiddleware(),
middleware.TimeoutMiddleware(),
middleware.RecoverPanicMiddleware(),
middleware.MonitoringMiddleware(m),
middleware.MonitoringMiddleware(m, nil),
}
}

2 changes: 1 addition & 1 deletion internal/beater/beater.go
Original file line number Diff line number Diff line change
@@ -387,7 +387,7 @@ func (s *Runner) Run(ctx context.Context) error {
apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)),
interceptors.ClientMetadata(),
interceptors.Logging(gRPCLogger),
interceptors.Metrics(gRPCLogger),
interceptors.Metrics(gRPCLogger, nil),
interceptors.Timeout(),
interceptors.Auth(authenticator),
interceptors.AnonymousRateLimit(ratelimitStore),
86 changes: 64 additions & 22 deletions internal/beater/interceptors/metrics.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@ package interceptors
import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -47,38 +49,38 @@ type UnaryRequestMetrics interface {
RequestMetrics(fullMethod string) map[request.ResultID]*monitoring.Int
}

// Metrics returns a grpc.UnaryServerInterceptor that increments metrics
// for gRPC method calls.
//
// If a gRPC service implements UnaryRequestMetrics, its RequestMetrics
// method will be called to obtain the metrics map for incrementing. If the
// service does not implement UnaryRequestMetrics, but
// RegisterMethodUnaryRequestMetrics has been called for the invoked method,
// then the registered UnaryRequestMetrics will be used instead. Finally,
// if neither of these are available, a warning will be logged and no metrics
// will be gathered.
func Metrics(logger *logp.Logger) grpc.UnaryServerInterceptor {
type metricsInterceptor struct {
logger *logp.Logger
meter metric.Meter

counters map[request.ResultID]metric.Int64Counter
}

func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
var m map[request.ResultID]*monitoring.Int
var ints map[request.ResultID]*monitoring.Int
if requestMetrics, ok := info.Server.(UnaryRequestMetrics); ok {
m = requestMetrics.RequestMetrics(info.FullMethod)
ints = requestMetrics.RequestMetrics(info.FullMethod)
} else {
m = methodUnaryRequestMetrics[info.FullMethod]
ints = methodUnaryRequestMetrics[info.FullMethod]
}
if m == nil {
logger.With(
if ints == nil {
m.logger.With(
"grpc.request.method", info.FullMethod,
).Warn("metrics registry missing")
return handler(ctx, req)
}

m[request.IDRequestCount].Inc()
defer m[request.IDResponseCount].Inc()
m.getMetric(request.IDRequestCount).Add(ctx, 1)
defer m.getMetric(request.IDResponseCount).Add(ctx, 1)

ints[request.IDRequestCount].Inc()
defer ints[request.IDResponseCount].Inc()

resp, err := handler(ctx, req)

@@ -88,17 +90,57 @@ func Metrics(logger *logp.Logger) grpc.UnaryServerInterceptor {
if s, ok := status.FromError(err); ok {
switch s.Code() {
case codes.Unauthenticated:
m[request.IDResponseErrorsUnauthorized].Inc()
m.getMetric(request.IDResponseErrorsUnauthorized).Add(ctx, 1)
ints[request.IDResponseErrorsUnauthorized].Inc()
case codes.DeadlineExceeded, codes.Canceled:
m[request.IDResponseErrorsTimeout].Inc()
m.getMetric(request.IDResponseErrorsTimeout).Add(ctx, 1)
ints[request.IDResponseErrorsTimeout].Inc()
case codes.ResourceExhausted:
m[request.IDResponseErrorsRateLimit].Inc()
m.getMetric(request.IDResponseErrorsRateLimit).Add(ctx, 1)
ints[request.IDResponseErrorsRateLimit].Inc()
}
}
}

m[responseID].Inc()
m.getMetric(responseID).Add(ctx, 1)
ints[responseID].Inc()

return resp, err
}
}

func (m *metricsInterceptor) getMetric(n request.ResultID) metric.Int64Counter {
name := "grpc.server." + n
if met, ok := m.counters[name]; ok {
return met
}

nm, _ := m.meter.Int64Counter(string(name))
m.counters[name] = nm
return nm
}

// Metrics returns a grpc.UnaryServerInterceptor that increments metrics
// for gRPC method calls.
//
// If a gRPC service implements UnaryRequestMetrics, its RequestMetrics
// method will be called to obtain the metrics map for incrementing. If the
// service does not implement UnaryRequestMetrics, but
// RegisterMethodUnaryRequestMetrics has been called for the invoked method,
// then the registered UnaryRequestMetrics will be used instead. Finally,
// if neither of these are available, a warning will be logged and no metrics
// will be gathered.
func Metrics(logger *logp.Logger, mp metric.MeterProvider) grpc.UnaryServerInterceptor {
if mp == nil {
mp = otel.GetMeterProvider()
}

i := &metricsInterceptor{
logger: logger,
meter: mp.Meter("internal/beater/interceptors"),

counters: map[request.ResultID]metric.Int64Counter{},
}

return i.Interceptor()
}
63 changes: 58 additions & 5 deletions internal/beater/interceptors/metrics_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -41,13 +43,20 @@ var monitoringKeys = append(
)

func TestMetrics(t *testing.T) {
reader := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))

registry := monitoring.NewRegistry()

monitoringMap := request.MonitoringMapForRegistry(registry, monitoringKeys)
methodName := "test_method_name"
logger := logp.NewLogger("interceptor.metrics.test")

interceptor := Metrics(logger)
interceptor := Metrics(logger, mp)

ctx := context.Background()
info := &grpc.UnaryServerInfo{
@@ -59,10 +68,13 @@ func TestMetrics(t *testing.T) {
}

for _, tc := range []struct {
name string
f func(ctx context.Context, req interface{}) (interface{}, error)
monitoringInt map[request.ResultID]int64
expectedOtel map[string]int64
}{
{
name: "with an error",
f: func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, errors.New("error")
},
@@ -71,13 +83,18 @@ func TestMetrics(t *testing.T) {
request.IDResponseCount: 1,
request.IDResponseValidCount: 0,
request.IDResponseErrorsCount: 1,
request.IDResponseErrorsInternal: 1,
request.IDResponseErrorsRateLimit: 0,
request.IDResponseErrorsTimeout: 0,
request.IDResponseErrorsUnauthorized: 0,
},
expectedOtel: map[string]int64{
"grpc.server." + string(request.IDRequestCount): 1,
"grpc.server." + string(request.IDResponseCount): 1,
"grpc.server." + string(request.IDResponseErrorsCount): 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why do we have an additional IDResponseErrorsInternal in monitoringInt ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't really figured out why, but nothing can set it from this interceptor, so I suspect it's because something elsewhere sets it, only for monitoring.Int.
The only location that sets it though is jaeger, and it's not executed here. So it may be state from a previous test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the metric from monitoring.Int, and the test doesn't fail either. So it does seem it's not being incremented in that interceptor (and the monitoring.Int test doesn't check for metrics that aren't emitted).

},
},
{
name: "with an unauthenticated error",
f: func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, status.Error(codes.Unauthenticated, "error")
},
@@ -91,8 +108,15 @@ func TestMetrics(t *testing.T) {
request.IDResponseErrorsTimeout: 0,
request.IDResponseErrorsUnauthorized: 1,
},
expectedOtel: map[string]int64{
"grpc.server." + string(request.IDRequestCount): 1,
"grpc.server." + string(request.IDResponseCount): 1,
"grpc.server." + string(request.IDResponseErrorsCount): 1,
"grpc.server." + string(request.IDResponseErrorsUnauthorized): 1,
},
},
{
name: "with a deadline exceeded error",
f: func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, status.Error(codes.DeadlineExceeded, "request timed out")
},
@@ -106,8 +130,15 @@ func TestMetrics(t *testing.T) {
request.IDResponseErrorsTimeout: 1,
request.IDResponseErrorsUnauthorized: 0,
},
expectedOtel: map[string]int64{
"grpc.server." + string(request.IDRequestCount): 1,
"grpc.server." + string(request.IDResponseCount): 1,
"grpc.server." + string(request.IDResponseErrorsCount): 1,
"grpc.server." + string(request.IDResponseErrorsTimeout): 1,
},
},
{
name: "with a canceled error",
f: func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, status.Error(codes.Canceled, "request timed out")
},
@@ -121,8 +152,15 @@ func TestMetrics(t *testing.T) {
request.IDResponseErrorsTimeout: 1,
request.IDResponseErrorsUnauthorized: 0,
},
expectedOtel: map[string]int64{
"grpc.server." + string(request.IDRequestCount): 1,
"grpc.server." + string(request.IDResponseCount): 1,
"grpc.server." + string(request.IDResponseErrorsCount): 1,
"grpc.server." + string(request.IDResponseErrorsTimeout): 1,
},
},
{
name: "with a resource exhausted error",
f: func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
},
@@ -136,8 +174,15 @@ func TestMetrics(t *testing.T) {
request.IDResponseErrorsTimeout: 0,
request.IDResponseErrorsUnauthorized: 0,
},
expectedOtel: map[string]int64{
"grpc.server." + string(request.IDRequestCount): 1,
"grpc.server." + string(request.IDResponseCount): 1,
"grpc.server." + string(request.IDResponseErrorsCount): 1,
"grpc.server." + string(request.IDResponseErrorsRateLimit): 1,
},
},
{
name: "with a success",
f: func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
},
@@ -151,11 +196,19 @@ func TestMetrics(t *testing.T) {
request.IDResponseErrorsTimeout: 0,
request.IDResponseErrorsUnauthorized: 0,
},
expectedOtel: map[string]int64{
"grpc.server." + string(request.IDRequestCount): 1,
"grpc.server." + string(request.IDResponseCount): 1,
"grpc.server." + string(request.IDResponseValidCount): 1,
},
},
} {
interceptor(ctx, nil, info, tc.f)
assertMonitoring(t, tc.monitoringInt, monitoringMap)
monitoringtest.ClearRegistry(monitoringMap)
t.Run(tc.name, func(t *testing.T) {
interceptor(ctx, nil, info, tc.f)
assertMonitoring(t, tc.monitoringInt, monitoringMap)
monitoringtest.ClearRegistry(monitoringMap)
monitoringtest.ExpectOtelMetrics(t, reader, tc.expectedOtel)
})
}
}

71 changes: 57 additions & 14 deletions internal/beater/middleware/monitoring_middleware.go
Original file line number Diff line number Diff line change
@@ -18,36 +18,79 @@
package middleware

import (
"context"
"net/http"

"github.com/elastic/elastic-agent-libs/monitoring"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/elastic/apm-server/internal/beater/request"
"github.com/elastic/elastic-agent-libs/monitoring"
)

// MonitoringMiddleware returns a middleware that increases monitoring counters for collecting metrics
// about request processing. As input parameter it takes a map capable of mapping a request.ResultID to a counter.
func MonitoringMiddleware(m map[request.ResultID]*monitoring.Int) Middleware {
type monitoringMiddleware struct {
meter metric.Meter

ints map[request.ResultID]*monitoring.Int
counters map[request.ResultID]metric.Int64Counter
}

func (m *monitoringMiddleware) Middleware() Middleware {
return func(h request.Handler) (request.Handler, error) {
inc := func(id request.ResultID) {
if counter, ok := m[id]; ok {
counter.Inc()
}
}
return func(c *request.Context) {
inc(request.IDRequestCount)
ctx := context.Background()

m.getMetric(request.IDRequestCount).Add(ctx, 1)
m.inc(request.IDRequestCount)

h(c)

inc(request.IDResponseCount)
m.getMetric(request.IDResponseCount).Add(ctx, 1)
m.inc(request.IDResponseCount)
if c.Result.StatusCode >= http.StatusBadRequest {
inc(request.IDResponseErrorsCount)
m.getMetric(request.IDResponseErrorsCount).Add(ctx, 1)
m.inc(request.IDResponseErrorsCount)
} else {
inc(request.IDResponseValidCount)
m.getMetric(request.IDResponseValidCount).Add(ctx, 1)
m.inc(request.IDResponseValidCount)
}

inc(c.Result.ID)
m.getMetric(c.Result.ID).Add(ctx, 1)
m.inc(c.Result.ID)
}, nil

}
}

func (m *monitoringMiddleware) inc(id request.ResultID) {
if counter, ok := m.ints[id]; ok {
counter.Inc()
}
}

func (m *monitoringMiddleware) getMetric(n request.ResultID) metric.Int64Counter {
name := "http.server." + n
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named those metrics http.server.xxx, and not apm-server.http.server.xxx, as the former better matches the otel semantic conventions.
https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-metrics.md

We can rename those metrics in the local exporter.

if met, ok := m.counters[name]; ok {
return met
}

nm, _ := m.meter.Int64Counter(string(name))
m.counters[name] = nm
return nm
}

// MonitoringMiddleware returns a middleware that increases monitoring counters for collecting metrics
// about request processing. As input parameter it takes a map capable of mapping a request.ResultID to a counter.
func MonitoringMiddleware(m map[request.ResultID]*monitoring.Int, mp metric.MeterProvider) Middleware {
if mp == nil {
mp = otel.GetMeterProvider()
}

mid := &monitoringMiddleware{
meter: mp.Meter("internal/beater/middleware"),
ints: m,
counters: map[request.ResultID]metric.Int64Counter{},
}

return mid.Middleware()
}
65 changes: 57 additions & 8 deletions internal/beater/middleware/monitoring_middleware_test.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/elastic/elastic-agent-libs/monitoring"

@@ -38,13 +40,23 @@ func TestMonitoringHandler(t *testing.T) {
checkMonitoring := func(t *testing.T,
h func(*request.Context),
expected map[request.ResultID]int,
expectedOtel map[string]int64,
m map[request.ResultID]*monitoring.Int,
) {
reader := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))

monitoringtest.ClearRegistry(m)
c, _ := DefaultContextWithResponseRecorder()
Apply(MonitoringMiddleware(m), h)(c)
Apply(MonitoringMiddleware(m, mp), h)(c)
equal, result := monitoringtest.CompareMonitoringInt(expected, m)
assert.True(t, equal, result)

monitoringtest.ExpectOtelMetrics(t, reader, expectedOtel)
}

t.Run("Error", func(t *testing.T) {
@@ -54,8 +66,16 @@ func TestMonitoringHandler(t *testing.T) {
request.IDRequestCount: 1,
request.IDResponseCount: 1,
request.IDResponseErrorsCount: 1,
request.IDResponseErrorsForbidden: 1},
mockMonitoring)
request.IDResponseErrorsForbidden: 1,
},
map[string]int64{
"http.server." + string(request.IDRequestCount): 1,
"http.server." + string(request.IDResponseCount): 1,
"http.server." + string(request.IDResponseErrorsCount): 1,
"http.server." + string(request.IDResponseErrorsForbidden): 1,
},
mockMonitoring,
)
})

t.Run("Accepted", func(t *testing.T) {
@@ -65,8 +85,16 @@ func TestMonitoringHandler(t *testing.T) {
request.IDRequestCount: 1,
request.IDResponseCount: 1,
request.IDResponseValidCount: 1,
request.IDResponseValidAccepted: 1},
mockMonitoring)
request.IDResponseValidAccepted: 1,
},
map[string]int64{
"http.server." + string(request.IDRequestCount): 1,
"http.server." + string(request.IDResponseCount): 1,
"http.server." + string(request.IDResponseValidCount): 1,
"http.server." + string(request.IDResponseValidAccepted): 1,
},
mockMonitoring,
)
})

t.Run("Idle", func(t *testing.T) {
@@ -76,8 +104,16 @@ func TestMonitoringHandler(t *testing.T) {
request.IDRequestCount: 1,
request.IDResponseCount: 1,
request.IDResponseValidCount: 1,
request.IDUnset: 1},
mockMonitoring)
request.IDUnset: 1,
},
map[string]int64{
"http.server." + string(request.IDRequestCount): 1,
"http.server." + string(request.IDResponseCount): 1,
"http.server." + string(request.IDResponseValidCount): 1,
"http.server." + string(request.IDUnset): 1,
},
mockMonitoring,
)
})

t.Run("Panic", func(t *testing.T) {
@@ -89,13 +125,26 @@ func TestMonitoringHandler(t *testing.T) {
request.IDResponseErrorsCount: 1,
request.IDResponseErrorsInternal: 1,
},
map[string]int64{
"http.server." + string(request.IDRequestCount): 1,
"http.server." + string(request.IDResponseCount): 1,
"http.server." + string(request.IDResponseErrorsCount): 1,
"http.server." + string(request.IDResponseErrorsInternal): 1,
},
mockMonitoring)
})

t.Run("Nil", func(t *testing.T) {
checkMonitoring(t,
HandlerIdle,
map[request.ResultID]int{},
mockMonitoringNil)
map[string]int64{
"http.server." + string(request.IDRequestCount): 1,
"http.server." + string(request.IDResponseCount): 1,
"http.server." + string(request.IDResponseValidCount): 1,
"http.server." + string(request.IDUnset): 1,
},
mockMonitoringNil,
)
})
}
59 changes: 59 additions & 0 deletions internal/beater/monitoringtest/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package monitoringtest

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func ExpectOtelMetrics(t *testing.T, reader sdkmetric.Reader, expectedMetrics map[string]int64) {
t.Helper()

var rm metricdata.ResourceMetrics
assert.NoError(t, reader.Collect(context.Background(), &rm))

assert.NotEqual(t, 0, len(rm.ScopeMetrics))
foundMetrics := []string{}
for _, sm := range rm.ScopeMetrics {

for _, m := range sm.Metrics {
switch d := m.Data.(type) {
case metricdata.Sum[int64]:
assert.Equal(t, 1, len(d.DataPoints))
foundMetrics = append(foundMetrics, m.Name)

if v, ok := expectedMetrics[m.Name]; ok {
assert.Equal(t, v, d.DataPoints[0].Value, m.Name)
} else {
assert.Fail(t, "unexpected metric", m.Name)
}
}
}
}

expectedMetricsKeys := []string{}
for k := range expectedMetrics {
expectedMetricsKeys = append(expectedMetricsKeys, k)
}
assert.ElementsMatch(t, expectedMetricsKeys, foundMetrics)
}
2 changes: 1 addition & 1 deletion internal/beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
@@ -190,7 +190,7 @@ func newGRPCServer(t *testing.T, batchProcessor modelpb.BatchProcessor) *grpc.Cl
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
logger := logp.NewLogger("otlp.grpc.test")
srv := grpc.NewServer(grpc.UnaryInterceptor(interceptors.Metrics(logger)))
srv := grpc.NewServer(grpc.UnaryInterceptor(interceptors.Metrics(logger, nil)))
semaphore := semaphore.NewWeighted(1)
otlp.RegisterGRPCServices(srv, zap.NewNop(), batchProcessor, semaphore)