Skip to content

Commit ee76d94

Browse files
[v17] In-process metrics registry (#51202)
* Use a non-global metrics registry in Teleport (#50913) * Support a non-global registry in Teleport * lint * Update lib/service/service.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --------- Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> * Serve metrics from the local registry in the diagnostic service (#51031) * Use local metrics registry in the diagnostic service * Test metrics are served by the diag service * Init local registry at runtime instead of config (#51074) --------- Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com>
1 parent edca124 commit ee76d94

File tree

3 files changed

+218
-2
lines changed

3 files changed

+218
-2
lines changed

lib/service/service.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"github.com/gravitational/roundtrip"
5555
"github.com/gravitational/trace"
5656
"github.com/jonboulle/clockwork"
57+
"github.com/prometheus/client_golang/prometheus"
5758
"github.com/prometheus/client_golang/prometheus/promhttp"
5859
"github.com/quic-go/quic-go"
5960
"github.com/sirupsen/logrus"
@@ -660,6 +661,15 @@ type TeleportProcess struct {
660661
// resolver is used to identify the reverse tunnel address when connecting via
661662
// the proxy.
662663
resolver reversetunnelclient.Resolver
664+
665+
// metricRegistry is the prometheus metric registry for the process.
666+
// Every teleport service that wants to register metrics should use this
667+
// instead of the global prometheus.DefaultRegisterer to avoid registration
668+
// conflicts.
669+
//
670+
// Both the metricsRegistry and the default global registry are gathered by
671+
// Telepeort's metric service.
672+
metricsRegistry *prometheus.Registry
663673
}
664674

665675
// processIndex is an internal process index
@@ -1003,6 +1013,15 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
10031013
"pid", fmt.Sprintf("%v.%v", os.Getpid(), processID),
10041014
)
10051015

1016+
// Use the custom metrics registry if specified, else create a new one.
1017+
// We must create the registry in NewTeleport, as opposed to ApplyConfig(),
1018+
// because some tests are running multiple Teleport instances from the same
1019+
// config.
1020+
metricsRegistry := cfg.MetricsRegistry
1021+
if metricsRegistry == nil {
1022+
metricsRegistry = prometheus.NewRegistry()
1023+
}
1024+
10061025
// If FIPS mode was requested make sure binary is build against BoringCrypto.
10071026
if cfg.FIPS {
10081027
if !modules.GetModules().IsBoringBinary() {
@@ -1187,6 +1206,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
11871206
logger: cfg.Logger,
11881207
cloudLabels: cloudLabels,
11891208
TracingProvider: tracing.NoopProvider(),
1209+
metricsRegistry: metricsRegistry,
11901210
}
11911211

11921212
process.registerExpectedServices(cfg)
@@ -3398,11 +3418,23 @@ func (process *TeleportProcess) initUploaderService() error {
33983418
return nil
33993419
}
34003420

3421+
// promHTTPLogAdapter adapts a slog.Logger into a promhttp.Logger.
3422+
type promHTTPLogAdapter struct {
3423+
ctx context.Context
3424+
*slog.Logger
3425+
}
3426+
3427+
// Println implements the promhttp.Logger interface.
3428+
func (l promHTTPLogAdapter) Println(v ...interface{}) {
3429+
//nolint:sloglint // msg cannot be constant
3430+
l.ErrorContext(l.ctx, fmt.Sprint(v...))
3431+
}
3432+
34013433
// initMetricsService starts the metrics service currently serving metrics for
34023434
// prometheus consumption
34033435
func (process *TeleportProcess) initMetricsService() error {
34043436
mux := http.NewServeMux()
3405-
mux.Handle("/metrics", promhttp.Handler())
3437+
mux.Handle("/metrics", process.newMetricsHandler())
34063438

34073439
logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id))
34083440

@@ -3487,6 +3519,33 @@ func (process *TeleportProcess) initMetricsService() error {
34873519
return nil
34883520
}
34893521

3522+
// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the
3523+
// in-process one.
3524+
func (process *TeleportProcess) newMetricsHandler() http.Handler {
3525+
// We gather metrics both from the in-process registry (preferred metrics registration method)
3526+
// and the global registry (used by some Teleport services and many dependencies).
3527+
gatherers := prometheus.Gatherers{
3528+
process.metricsRegistry,
3529+
prometheus.DefaultGatherer,
3530+
}
3531+
3532+
metricsHandler := promhttp.InstrumentMetricHandler(
3533+
process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{
3534+
// Errors can happen if metrics are registered with identical names in both the local and the global registry.
3535+
// In this case, we log the error but continue collecting metrics. The first collected metric will win
3536+
// (the one from the local metrics registry takes precedence).
3537+
// As we move more things to the local registry, especially in other tools like tbot, we will have less
3538+
// conflicts in tests.
3539+
ErrorHandling: promhttp.ContinueOnError,
3540+
ErrorLog: promHTTPLogAdapter{
3541+
ctx: process.ExitContext(),
3542+
Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics),
3543+
},
3544+
}),
3545+
)
3546+
return metricsHandler
3547+
}
3548+
34903549
// initDiagnosticService starts diagnostic service currently serving healthz
34913550
// and prometheus endpoints
34923551
func (process *TeleportProcess) initDiagnosticService() error {
@@ -3496,7 +3555,7 @@ func (process *TeleportProcess) initDiagnosticService() error {
34963555
// metrics will otherwise be served by the metrics service if it's enabled
34973556
// in the config.
34983557
if !process.Config.Metrics.Enabled {
3499-
mux.Handle("/metrics", promhttp.Handler())
3558+
mux.Handle("/metrics", process.newMetricsHandler())
35003559
}
35013560

35023561
if process.Config.Debug {

lib/service/service_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323
"crypto/tls"
2424
"errors"
2525
"fmt"
26+
"io"
2627
"log/slog"
2728
"net"
2829
"net/http"
30+
"net/url"
2931
"os"
3032
"path/filepath"
3133
"strings"
@@ -39,7 +41,9 @@ import (
3941
"github.com/google/uuid"
4042
"github.com/gravitational/trace"
4143
"github.com/jonboulle/clockwork"
44+
"github.com/prometheus/client_golang/prometheus"
4245
"github.com/sirupsen/logrus"
46+
"github.com/stretchr/testify/assert"
4347
"github.com/stretchr/testify/require"
4448
"golang.org/x/sync/errgroup"
4549
"google.golang.org/grpc"
@@ -1849,6 +1853,152 @@ func TestInitDatabaseService(t *testing.T) {
18491853
}
18501854
}
18511855

1856+
// TestMetricsService tests that the optional metrics service exposes
1857+
// metrics from both the in-process and global metrics registry. When the
1858+
// service is disabled, metrics are served by the diagnostics service
1859+
// (tested in TestMetricsInDiagnosticsService).
1860+
func TestMetricsService(t *testing.T) {
1861+
t.Parallel()
1862+
// Test setup: create a listener for the metrics server, get its file descriptor.
1863+
1864+
// Note: this code is copied from integrations/helpers/NewListenerOn() to avoid including helpers in a production
1865+
// build and avoid a cyclic dependency.
1866+
metricsListener, err := net.Listen("tcp", "127.0.0.1:0")
1867+
require.NoError(t, err)
1868+
t.Cleanup(func() {
1869+
assert.NoError(t, metricsListener.Close())
1870+
})
1871+
require.IsType(t, &net.TCPListener{}, metricsListener)
1872+
metricsListenerFile, err := metricsListener.(*net.TCPListener).File()
1873+
require.NoError(t, err)
1874+
1875+
// Test setup: create a new teleport process
1876+
dataDir := makeTempDir(t)
1877+
cfg := servicecfg.MakeDefaultConfig()
1878+
cfg.DataDir = dataDir
1879+
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
1880+
cfg.Auth.Enabled = true
1881+
cfg.Proxy.Enabled = false
1882+
cfg.SSH.Enabled = false
1883+
cfg.DebugService.Enabled = false
1884+
cfg.Auth.StorageConfig.Params["path"] = dataDir
1885+
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
1886+
cfg.Metrics.Enabled = true
1887+
1888+
// Configure the metrics server to use the listener we previously created.
1889+
cfg.Metrics.ListenAddr = &utils.NetAddr{AddrNetwork: "tcp", Addr: metricsListener.Addr().String()}
1890+
cfg.FileDescriptors = []*servicecfg.FileDescriptor{
1891+
{Type: string(ListenerMetrics), Address: metricsListener.Addr().String(), File: metricsListenerFile},
1892+
}
1893+
1894+
// Create and start the Teleport service.
1895+
process, err := NewTeleport(cfg)
1896+
require.NoError(t, err)
1897+
require.NoError(t, process.Start())
1898+
t.Cleanup(func() {
1899+
assert.NoError(t, process.Close())
1900+
assert.NoError(t, process.Wait())
1901+
})
1902+
1903+
// Test setup: create our test metrics.
1904+
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
1905+
localMetric := prometheus.NewGauge(prometheus.GaugeOpts{
1906+
Namespace: "test",
1907+
Name: "local_metric_" + nonce,
1908+
})
1909+
globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
1910+
Namespace: "test",
1911+
Name: "global_metric_" + nonce,
1912+
})
1913+
require.NoError(t, process.metricsRegistry.Register(localMetric))
1914+
require.NoError(t, prometheus.Register(globalMetric))
1915+
1916+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
1917+
t.Cleanup(cancel)
1918+
_, err = process.WaitForEvent(ctx, MetricsReady)
1919+
require.NoError(t, err)
1920+
1921+
// Test execution: get metrics and check the tests metrics are here.
1922+
metricsURL, err := url.Parse("http://" + metricsListener.Addr().String())
1923+
require.NoError(t, err)
1924+
metricsURL.Path = "/metrics"
1925+
resp, err := http.Get(metricsURL.String())
1926+
require.NoError(t, err)
1927+
require.Equal(t, http.StatusOK, resp.StatusCode)
1928+
1929+
body, err := io.ReadAll(resp.Body)
1930+
require.NoError(t, err)
1931+
require.NoError(t, resp.Body.Close())
1932+
1933+
// Test validation: check that the metrics server served both the local and global registry.
1934+
require.Contains(t, string(body), "local_metric_"+nonce)
1935+
require.Contains(t, string(body), "global_metric_"+nonce)
1936+
}
1937+
1938+
// TestMetricsInDiagnosticsService tests that the diagnostics service exposes
1939+
// metrics from both the in-process and global metrics registry when the metrics
1940+
// service is disabled.
1941+
func TestMetricsInDiagnosticsService(t *testing.T) {
1942+
t.Parallel()
1943+
// Test setup: create a new teleport process
1944+
dataDir := makeTempDir(t)
1945+
cfg := servicecfg.MakeDefaultConfig()
1946+
cfg.DataDir = dataDir
1947+
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
1948+
cfg.Auth.Enabled = true
1949+
cfg.Proxy.Enabled = false
1950+
cfg.SSH.Enabled = false
1951+
cfg.DebugService.Enabled = false
1952+
cfg.Auth.StorageConfig.Params["path"] = dataDir
1953+
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
1954+
cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
1955+
1956+
// Test setup: Create and start the Teleport service.
1957+
process, err := NewTeleport(cfg)
1958+
require.NoError(t, err)
1959+
require.NoError(t, process.Start())
1960+
t.Cleanup(func() {
1961+
assert.NoError(t, process.Close())
1962+
assert.NoError(t, process.Wait())
1963+
})
1964+
1965+
// Test setup: create our test metrics.
1966+
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
1967+
localMetric := prometheus.NewGauge(prometheus.GaugeOpts{
1968+
Namespace: "test",
1969+
Name: "local_metric_" + nonce,
1970+
})
1971+
globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
1972+
Namespace: "test",
1973+
Name: "global_metric_" + nonce,
1974+
})
1975+
require.NoError(t, process.metricsRegistry.Register(localMetric))
1976+
require.NoError(t, prometheus.Register(globalMetric))
1977+
1978+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
1979+
t.Cleanup(cancel)
1980+
_, err = process.WaitForEvent(ctx, TeleportReadyEvent)
1981+
require.NoError(t, err)
1982+
1983+
// Test execution: query the metrics endpoint and check the tests metrics are here.
1984+
diagAddr, err := process.DiagnosticAddr()
1985+
require.NoError(t, err)
1986+
metricsURL, err := url.Parse("http://" + diagAddr.String())
1987+
require.NoError(t, err)
1988+
metricsURL.Path = "/metrics"
1989+
resp, err := http.Get(metricsURL.String())
1990+
require.NoError(t, err)
1991+
require.Equal(t, http.StatusOK, resp.StatusCode)
1992+
1993+
body, err := io.ReadAll(resp.Body)
1994+
require.NoError(t, err)
1995+
require.NoError(t, resp.Body.Close())
1996+
1997+
// Test validation: check that the metrics server served both the local and global registry.
1998+
require.Contains(t, string(body), "local_metric_"+nonce)
1999+
require.Contains(t, string(body), "global_metric_"+nonce)
2000+
}
2001+
18522002
// makeTempDir makes a temp dir with a shorter name than t.TempDir() in order to
18532003
// avoid https://github.com/golang/go/issues/62614.
18542004
func makeTempDir(t *testing.T) string {

lib/service/servicecfg/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/ghodss/yaml"
3535
"github.com/gravitational/trace"
3636
"github.com/jonboulle/clockwork"
37+
"github.com/prometheus/client_golang/prometheus"
3738
"github.com/sirupsen/logrus"
3839
"golang.org/x/crypto/ssh"
3940

@@ -270,6 +271,12 @@ type Config struct {
270271
// protocol.
271272
DatabaseREPLRegistry dbrepl.REPLRegistry
272273

274+
// MetricsRegistry is the prometheus metrics registry used by the Teleport process to register its metrics.
275+
// As of today, not every Teleport metric is registered against this registry. Some Teleport services
276+
// and Teleport dependencies are using the global registry.
277+
// Both the MetricsRegistry and the default global registry are gathered by Teleport's metric service.
278+
MetricsRegistry *prometheus.Registry
279+
273280
// token is either the token needed to join the auth server, or a path pointing to a file
274281
// that contains the token
275282
//

0 commit comments

Comments
 (0)