Skip to content

Commit

Permalink
agent: allow configuration of in-memory telemetry sink. (#20166)
Browse files Browse the repository at this point in the history
This change adds configuration options for setting the in-memory
telemetry sink collection and retention durations. This sink backs
the metrics JSON API and previously had hard-coded default values.

The new options are particularly useful when running development or
debug environments, where metrics collection is desired at a fast
and granular rate.
  • Loading branch information
jrasell authored Mar 25, 2024
1 parent 02d98b9 commit facc3e8
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 34 deletions.
3 changes: 3 additions & 0 deletions .changelog/20166.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
agent: allow configuration of in-memory telemetry sink
```
16 changes: 9 additions & 7 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
return false
}

if err := config.Telemetry.Validate(); err != nil {
c.Ui.Error(fmt.Sprintf("telemetry block invalid: %v", err))
return false
}

// Set up the TLS configuration properly if we have one.
// XXX chelseakomlo: set up a TLSConfig New method which would wrap
// constructor-type actions like this.
Expand Down Expand Up @@ -1155,14 +1160,8 @@ func (c *Command) handleReload() {
}
}

// setupTelemetry is used ot setup the telemetry sub-systems
// setupTelemetry is used to set up the telemetry sub-systems.
func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
/* Setup telemetry
Aggregate on 10 second intervals for 1 minute. Expose the
metrics over stderr when there is a SIGUSR1 received.
*/
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
metrics.DefaultInmemSignal(inm)

var telConfig *Telemetry
if config.Telemetry == nil {
Expand All @@ -1171,6 +1170,9 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
telConfig = config.Telemetry
}

inm := metrics.NewInmemSink(telConfig.inMemoryCollectionInterval, telConfig.inMemoryRetentionPeriod)
metrics.DefaultInmemSignal(inm)

metricsConf := metrics.DefaultConfig("nomad")
metricsConf.EnableHostname = !telConfig.DisableHostname

Expand Down
66 changes: 60 additions & 6 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,20 @@ func (s *ServerConfig) EncryptBytes() ([]byte, error) {

// Telemetry is the telemetry configuration for the server
type Telemetry struct {

// InMemoryCollectionInterval configures the in-memory sink collection
// interval. This sink is always configured and backs the JSON metrics API
// endpoint. This option is particularly useful for debugging or
// development.
InMemoryCollectionInterval string `hcl:"in_memory_collection_interval"`
inMemoryCollectionInterval time.Duration `hcl:"-"`

// InMemoryRetentionPeriod configures the in-memory sink retention period
// This sink is always configured and backs the JSON metrics API endpoint.
// This option is particularly useful for debugging or development.
InMemoryRetentionPeriod string `hcl:"in_memory_retention_period"`
inMemoryRetentionPeriod time.Duration `hcl:"-"`

StatsiteAddr string `hcl:"statsite_address"`
StatsdAddr string `hcl:"statsd_address"`
DataDogAddr string `hcl:"datadog_address"`
Expand Down Expand Up @@ -1062,8 +1076,8 @@ func (t *Telemetry) Copy() *Telemetry {
}

// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
func (a *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range a.PrefixFilter {
func (t *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range t.PrefixFilter {
if rule == "" {
continue
}
Expand All @@ -1079,6 +1093,30 @@ func (a *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
return allowed, blocked, nil
}

// Validate the telemetry configuration options. These are used by the agent,
// regardless of mode, so can live here rather than a structs package. It is
// safe to call, without checking whether the config object is nil first.
func (t *Telemetry) Validate() error {
if t == nil {
return nil
}

// Ensure we have durations that are greater than zero.
if t.inMemoryCollectionInterval <= 0 {
return errors.New("telemetry in-memory collection interval must be greater than zero")
}
if t.inMemoryRetentionPeriod <= 0 {
return errors.New("telemetry in-memory retention period must be greater than zero")
}

// Ensure the in-memory durations do not conflict.
if t.inMemoryCollectionInterval > t.inMemoryRetentionPeriod {
return errors.New("telemetry in-memory collection interval cannot be greater than retention period")
}

return nil
}

// Ports encapsulates the various ports we bind to for network services. If any
// are not specified then the defaults are used instead.
type Ports struct {
Expand Down Expand Up @@ -1391,8 +1429,12 @@ func DefaultConfig() *Config {
},
SyslogFacility: "LOCAL0",
Telemetry: &Telemetry{
CollectionInterval: "1s",
collectionInterval: 1 * time.Second,
InMemoryCollectionInterval: "10s",
inMemoryCollectionInterval: 10 * time.Second,
InMemoryRetentionPeriod: "1m",
inMemoryRetentionPeriod: 1 * time.Minute,
CollectionInterval: "1s",
collectionInterval: 1 * time.Second,
},
TLSConfig: &config.TLSConfig{},
Sentinel: &config.SentinelConfig{},
Expand Down Expand Up @@ -2371,9 +2413,21 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
}

// Merge is used to merge two telemetry configs together
func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
result := *a
func (t *Telemetry) Merge(b *Telemetry) *Telemetry {
result := *t

if b.InMemoryCollectionInterval != "" {
result.InMemoryCollectionInterval = b.InMemoryCollectionInterval
}
if b.inMemoryCollectionInterval != 0 {
result.inMemoryCollectionInterval = b.inMemoryCollectionInterval
}
if b.InMemoryRetentionPeriod != "" {
result.InMemoryRetentionPeriod = b.InMemoryRetentionPeriod
}
if b.inMemoryRetentionPeriod != 0 {
result.inMemoryRetentionPeriod = b.inMemoryRetentionPeriod
}
if b.StatsiteAddr != "" {
result.StatsiteAddr = b.StatsiteAddr
}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func ParseConfigFile(path string) (*Config, error) {
{"server.server_join.retry_interval", &c.Server.ServerJoin.RetryInterval, &c.Server.ServerJoin.RetryIntervalHCL, nil},
{"autopilot.server_stabilization_time", &c.Autopilot.ServerStabilizationTime, &c.Autopilot.ServerStabilizationTimeHCL, nil},
{"autopilot.last_contact_threshold", &c.Autopilot.LastContactThreshold, &c.Autopilot.LastContactThresholdHCL, nil},
{"telemetry.in_memory_collection_interval", &c.Telemetry.inMemoryCollectionInterval, &c.Telemetry.InMemoryCollectionInterval, nil},
{"telemetry.in_memory_retention_period", &c.Telemetry.inMemoryRetentionPeriod, &c.Telemetry.InMemoryRetentionPeriod, nil},
{"telemetry.collection_interval", &c.Telemetry.collectionInterval, &c.Telemetry.CollectionInterval, nil},
{"client.template.block_query_wait", nil, &c.Client.TemplateConfig.BlockQueryWaitTimeHCL,
func(d *time.Duration) {
Expand Down
42 changes: 33 additions & 9 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,19 @@ var basicConfig = &Config{
},
},
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.1:1234",
StatsdAddr: "127.0.0.1:2345",
PrometheusMetrics: true,
DisableHostname: true,
UseNodeName: false,
CollectionInterval: "3s",
collectionInterval: 3 * time.Second,
PublishAllocationMetrics: true,
PublishNodeMetrics: true,
StatsiteAddr: "127.0.0.1:1234",
StatsdAddr: "127.0.0.1:2345",
PrometheusMetrics: true,
DisableHostname: true,
UseNodeName: false,
InMemoryCollectionInterval: "1m",
inMemoryCollectionInterval: 1 * time.Minute,
InMemoryRetentionPeriod: "24h",
inMemoryRetentionPeriod: 24 * time.Hour,
CollectionInterval: "3s",
collectionInterval: 3 * time.Second,
PublishAllocationMetrics: true,
PublishNodeMetrics: true,
},
LeaveOnInt: true,
LeaveOnTerm: true,
Expand Down Expand Up @@ -1080,3 +1084,23 @@ func TestConfig_MultipleConsul(t *testing.T) {
})
}
}

func TestConfig_Telemetry(t *testing.T) {
ci.Parallel(t)

// Ensure merging a mostly empty struct correctly inherits default values
// set.
inputTelemetry1 := &Telemetry{PrometheusMetrics: true}
mergedTelemetry1 := DefaultConfig().Telemetry.Merge(inputTelemetry1)
must.Eq(t, mergedTelemetry1.inMemoryCollectionInterval, 10*time.Second)
must.Eq(t, mergedTelemetry1.inMemoryRetentionPeriod, 1*time.Minute)

// Ensure we can then overlay user specified data.
inputTelemetry2 := &Telemetry{
inMemoryCollectionInterval: 1 * time.Second,
inMemoryRetentionPeriod: 10 * time.Second,
}
mergedTelemetry2 := mergedTelemetry1.Merge(inputTelemetry2)
must.Eq(t, mergedTelemetry2.inMemoryCollectionInterval, 1*time.Second)
must.Eq(t, mergedTelemetry2.inMemoryRetentionPeriod, 10*time.Second)
}
58 changes: 58 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package agent

import (
"errors"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -1385,6 +1386,63 @@ func TestTelemetry_PrefixFilters(t *testing.T) {
}
}

func TestTelemetry_Validate(t *testing.T) {
ci.Parallel(t)

testCases := []struct {
name string
inputTelemetry *Telemetry
expectedError error
}{
{
name: "nil",
inputTelemetry: nil,
expectedError: nil,
},
{
name: "invalid",
inputTelemetry: &Telemetry{
inMemoryCollectionInterval: 10 * time.Second,
inMemoryRetentionPeriod: 1 * time.Second,
},
expectedError: errors.New("telemetry in-memory collection interval cannot be greater than retention period"),
},
{
name: "valid",
inputTelemetry: &Telemetry{
inMemoryCollectionInterval: 1 * time.Second,
inMemoryRetentionPeriod: 10 * time.Second,
},
expectedError: nil,
},
{
name: "missing in-memory interval",
inputTelemetry: &Telemetry{
inMemoryRetentionPeriod: 10 * time.Second,
},
expectedError: errors.New("telemetry in-memory collection interval must be greater than zero"),
},
{
name: "missing in-memory collection",
inputTelemetry: &Telemetry{
inMemoryCollectionInterval: 10 * time.Second,
},
expectedError: errors.New("telemetry in-memory retention period must be greater than zero"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualError := tc.inputTelemetry.Validate()
if tc.expectedError != nil {
must.EqError(t, actualError, tc.expectedError.Error())
} else {
must.NoError(t, actualError)
}
})
}
}

func TestTelemetry_Parse(t *testing.T) {
ci.Parallel(t)

Expand Down
16 changes: 9 additions & 7 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,15 @@ audit {
}

telemetry {
statsite_address = "127.0.0.1:1234"
statsd_address = "127.0.0.1:2345"
prometheus_metrics = true
disable_hostname = true
collection_interval = "3s"
publish_allocation_metrics = true
publish_node_metrics = true
in_memory_collection_interval = "1m"
in_memory_retention_period = "24h"
statsite_address = "127.0.0.1:1234"
statsd_address = "127.0.0.1:2345"
prometheus_metrics = true
disable_hostname = true
collection_interval = "3s"
publish_allocation_metrics = true
publish_node_metrics = true
}

leave_on_interrupt = true
Expand Down
14 changes: 9 additions & 5 deletions command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@
"server_service_name": "nomad",
"service_auth_method": "nomad-services",
"task_auth_method": "nomad-tasks",

"service_identity": {
"aud": [
"consul.io",
Expand Down Expand Up @@ -361,6 +360,8 @@
"syslog_facility": "LOCAL1",
"telemetry": [
{
"in_memory_collection_interval": "1m",
"in_memory_retention_period": "24h",
"collection_interval": "3s",
"disable_hostname": true,
"prometheus_metrics": true,
Expand Down Expand Up @@ -394,7 +395,10 @@
"cert_file": "/path/to/cert/file",
"create_from_role": "test_role",
"default_identity": {
"aud": ["vault.io", "nomad.io"],
"aud": [
"vault.io",
"nomad.io"
],
"env": false,
"file": true,
"ttl": "3h"
Expand All @@ -408,9 +412,9 @@
"token": "12345"
}
],
"reporting":{
"license":{
"enabled":"true"
"reporting": {
"license": {
"enabled": "true"
}
}
}
10 changes: 10 additions & 0 deletions website/content/docs/configuration/telemetry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ parameters on this page are grouped by the telemetry provider.

The following options are available on all telemetry configurations.

- `in_memory_collection_interval` `(duration: 10s)` Configures the in-memory
sink collection interval. This sink is always configured and backs the JSON
metrics API endpoint. This option is particularly useful for debugging or
development purposes, where aggressive collection is required.

- `in_memory_retention_period` `(duration: 1m)` Configures the in-memory sink
retention period. This sink is always configured and backs the JSON metrics
API endpoint. This option is particularly useful for debugging or development
purposes.

- `disable_hostname` `(bool: false)` - Specifies if gauge values should be
prefixed with the local hostname.

Expand Down

0 comments on commit facc3e8

Please sign in to comment.