Skip to content

Commit

Permalink
Use a non-global metrics registry in Teleport (#50913)
Browse files Browse the repository at this point in the history
* Support a non-global registry in Teleport

* lint

* Update lib/service/service.go

Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com>

---------

Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com>
  • Loading branch information
hugoShaka and rosstimothy authored Jan 10, 2025
1 parent 4a10f05 commit 5b5bab9
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 3 deletions.
48 changes: 47 additions & 1 deletion lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/quic-go/quic-go"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -657,6 +658,15 @@ type TeleportProcess struct {
// resolver is used to identify the reverse tunnel address when connecting via
// the proxy.
resolver reversetunnelclient.Resolver

// metricRegistry is the prometheus metric registry for the process.
// Every teleport service that wants to register metrics should use this
// instead of the global prometheus.DefaultRegisterer to avoid registration
// conflicts.
//
// Both the metricsRegistry and the default global registry are gathered by
// Telepeort's metric service.
metricsRegistry *prometheus.Registry
}

// processIndex is an internal process index
Expand Down Expand Up @@ -1179,6 +1189,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
logger: cfg.Logger,
cloudLabels: cloudLabels,
TracingProvider: tracing.NoopProvider(),
metricsRegistry: cfg.MetricsRegistry,
}

process.registerExpectedServices(cfg)
Expand Down Expand Up @@ -3405,11 +3416,46 @@ func (process *TeleportProcess) initUploaderService() error {
return nil
}

// promHTTPLogAdapter adapts a slog.Logger into a promhttp.Logger.
type promHTTPLogAdapter struct {
ctx context.Context
*slog.Logger
}

// Println implements the promhttp.Logger interface.
func (l promHTTPLogAdapter) Println(v ...interface{}) {
//nolint:sloglint // msg cannot be constant
l.ErrorContext(l.ctx, fmt.Sprint(v...))
}

// initMetricsService starts the metrics service currently serving metrics for
// prometheus consumption
func (process *TeleportProcess) initMetricsService() error {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())

// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
gatherers := prometheus.Gatherers{
process.metricsRegistry,
prometheus.DefaultGatherer,
}

metricsHandler := promhttp.InstrumentMetricHandler(
process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{
// Errors can happen if metrics are registered with identical names in both the local and the global registry.
// In this case, we log the error but continue collecting metrics. The first collected metric will win
// (the one from the local metrics registry takes precedence).
// As we move more things to the local registry, especially in other tools like tbot, we will have less
// conflicts in tests.
ErrorHandling: promhttp.ContinueOnError,
ErrorLog: promHTTPLogAdapter{
ctx: process.ExitContext(),
Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics),
},
}),
)

mux.Handle("/metrics", metricsHandler)

logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id))

Expand Down
86 changes: 84 additions & 2 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -39,6 +41,8 @@ import (
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1887,7 +1891,7 @@ func TestAgentRolloutController(t *testing.T) {
dataDir := makeTempDir(t)

cfg := servicecfg.MakeDefaultConfig()
// We use a real clock because too many sevrices are using the clock and it's not possible to accurately wait for
// We use a real clock because too many services are using the clock and it's not possible to accurately wait for
// each one of them to reach the point where they wait for the clock to advance. If we add a WaitUntil(X waiters)
// check, this will break the next time we add a new waiter.
cfg.Clock = clockwork.NewRealClock()
Expand All @@ -1906,7 +1910,7 @@ func TestAgentRolloutController(t *testing.T) {
process, err := NewTeleport(cfg)
require.NoError(t, err)

// Test setup: start the Teleport auth and wait for it to beocme ready
// Test setup: start the Teleport auth and wait for it to become ready
require.NoError(t, process.Start())

// Test setup: wait for every service to start
Expand Down Expand Up @@ -1949,6 +1953,84 @@ func TestAgentRolloutController(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)
}

func TestMetricsService(t *testing.T) {
t.Parallel()
// Test setup: create a listener for the metrics server, get its file descriptor.

// Note: this code is copied from integrations/helpers/NewListenerOn() to avoid including helpers in a production
// build and avoid a cyclic dependency.
metricsListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, metricsListener.Close())
})
require.IsType(t, &net.TCPListener{}, metricsListener)
metricsListenerFile, err := metricsListener.(*net.TCPListener).File()
require.NoError(t, err)

// Test setup: create a new teleport process
dataDir := makeTempDir(t)
cfg := servicecfg.MakeDefaultConfig()
cfg.DataDir = dataDir
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Proxy.Enabled = false
cfg.SSH.Enabled = false
cfg.DebugService.Enabled = false
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.Metrics.Enabled = true

// Configure the metrics server to use the listener we previously created.
cfg.Metrics.ListenAddr = &utils.NetAddr{AddrNetwork: "tcp", Addr: metricsListener.Addr().String()}
cfg.FileDescriptors = []*servicecfg.FileDescriptor{
{Type: string(ListenerMetrics), Address: metricsListener.Addr().String(), File: metricsListenerFile},
}

// Create and start the Teleport service.
process, err := NewTeleport(cfg)
require.NoError(t, err)
require.NoError(t, process.Start())
t.Cleanup(func() {
assert.NoError(t, process.Close())
assert.NoError(t, process.Wait())
})

// Test setup: create our test metrics.
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
localMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "local_metric_" + nonce,
})
globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "global_metric_" + nonce,
})
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)
_, err = process.WaitForEvent(ctx, MetricsReady)
require.NoError(t, err)

// Test execution: get metrics and check the tests metrics are here.
metricsURL, err := url.Parse("http://" + metricsListener.Addr().String())
require.NoError(t, err)
metricsURL.Path = "/metrics"
resp, err := http.Get(metricsURL.String())
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
}

// makeTempDir makes a temp dir with a shorter name than t.TempDir() in order to
// avoid https://github.com/golang/go/issues/62614.
func makeTempDir(t *testing.T) string {
Expand Down
11 changes: 11 additions & 0 deletions lib/service/servicecfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ghodss/yaml"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -264,6 +265,12 @@ type Config struct {
// protocol.
DatabaseREPLRegistry dbrepl.REPLRegistry

// MetricsRegistry is the prometheus metrics registry used by the Teleport process to register its metrics.
// As of today, not every Teleport metric is registered against this registry. Some Teleport services
// and Teleport dependencies are using the global registry.
// Both the MetricsRegistry and the default global registry are gathered by Teleport's metric service.
MetricsRegistry *prometheus.Registry

// token is either the token needed to join the auth server, or a path pointing to a file
// that contains the token
//
Expand Down Expand Up @@ -520,6 +527,10 @@ func ApplyDefaults(cfg *Config) {
cfg.LoggerLevel = new(slog.LevelVar)
}

if cfg.MetricsRegistry == nil {
cfg.MetricsRegistry = prometheus.NewRegistry()
}

// Remove insecure and (borderline insecure) cryptographic primitives from
// default configuration. These can still be added back in file configuration by
// users, but not supported by default by Teleport. See #1856 for more
Expand Down

0 comments on commit 5b5bab9

Please sign in to comment.