Skip to content

Commit

Permalink
Merge pull request #67 from Cloudzero/feature/cp-23050
Browse files Browse the repository at this point in the history
CP-23050: KSM Metric Validation
  • Loading branch information
bdrennz authored Dec 5, 2024
2 parents 33d5ebb + a42e5a8 commit f62ae72
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 33 deletions.
6 changes: 6 additions & 0 deletions pkg/config/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Prometheus struct {
Executable string `yaml:"executable" default:"/bin/prometheus" env:"PROMETHEUS_EXECUTABLE" env-description:"Prometheus Executable Path"`
KubeStateMetricsServiceEndpoint string `yaml:"kube_state_metrics_service_endpoint" env:"KMS_EP_URL" required:"true" env-description:"Kube State Metrics Service Endpoint"`
Configurations []string `yaml:"configurations"`
KubeMetrics []string `yaml:"kube_metrics"`
}

func (s *Prometheus) Validate() error {
Expand Down Expand Up @@ -43,5 +44,10 @@ func (s *Prometheus) Validate() error {
}
s.Configurations = cleanedPaths
}

if len(s.KubeMetrics) == 0 {
return errors.New("no KubeMetrics provided")
}

return nil
}
11 changes: 11 additions & 0 deletions pkg/config/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,34 @@ func TestPrometheus_Validate(t *testing.T) {
prom: config.Prometheus{
KubeStateMetricsServiceEndpoint: kmsServiceEndpoint,
Configurations: []string{scrapeConfigFile},
KubeMetrics: []string{"kube_node_info", "kube_pod_info"},
},
expected: nil,
},
{
name: "MissingKubeStateMetricsServiceEndpoint",
prom: config.Prometheus{
Configurations: []string{scrapeConfigFile},
KubeMetrics: []string{"kube_node_info", "kube_pod_info"},
},
expected: errors.New(config.ErrNoKubeStateMetricsServiceEndpointMsg),
},
{
name: "MissingScrapeConfigLocation",
prom: config.Prometheus{
KubeStateMetricsServiceEndpoint: kmsServiceEndpoint,
KubeMetrics: []string{"kube_node_info", "kube_pod_info"},
},
expected: nil,
},
{
name: "MissingKubeMetrics",
prom: config.Prometheus{
KubeStateMetricsServiceEndpoint: kmsServiceEndpoint,
Configurations: []string{scrapeConfigFile},
},
expected: errors.New("no KubeMetrics provided"),
},
}

for _, tt := range tests {
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/testdata/cloudzero-agent-validator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ cloudzero:

prometheus:
kube_state_metrics_service_endpoint: http://kube-state-metrics:8080
kube_metrics:
- kube_node_info
- kube_node_status_capacity
- kube_pod_container_resource_limits
- kube_pod_container_resource_requests
- kube_pod_labels
- kube_pod_info
- node_dmi_info
configurations:
- prometheus.yml

Expand Down
30 changes: 30 additions & 0 deletions pkg/diagnostic/catalog/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,34 @@ package catalog_test

import (
"context"
"net/http"
"testing"

"github.com/cloudzero/cloudzero-agent-validator/pkg/config"
"github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic"
"github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/catalog"
"github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/kms"
"github.com/cloudzero/cloudzero-agent-validator/pkg/status"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"
)

type mockProvider struct{}

func (m *mockProvider) Check(ctx context.Context, client *http.Client, recorder status.Accessor) error {
return nil
}

func mockKMSProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider {
return &mockProvider{}
}

func TestRegistry_Get(t *testing.T) {
// Override kms.NewProvider
originalNewProvider := kms.NewProvider
kms.NewProvider = mockKMSProvider
defer func() { kms.NewProvider = originalNewProvider }()

ctx := context.Background()
c := &config.Settings{}
r := catalog.NewCatalog(ctx, c)
Expand All @@ -28,6 +48,11 @@ func TestRegistry_Get(t *testing.T) {
}

func TestRegistry_Has(t *testing.T) {
// Override kms.NewProvider
originalNewProvider := kms.NewProvider
kms.NewProvider = mockKMSProvider
defer func() { kms.NewProvider = originalNewProvider }()

ctx := context.Background()
c := &config.Settings{}
r := catalog.NewCatalog(ctx, c)
Expand All @@ -42,6 +67,11 @@ func TestRegistry_Has(t *testing.T) {
}

func TestRegistry_List(t *testing.T) {
// Override kms.NewProvider
originalNewProvider := kms.NewProvider
kms.NewProvider = mockKMSProvider
defer func() { kms.NewProvider = originalNewProvider }()

ctx := context.Background()
c := &config.Settings{}
r := catalog.NewCatalog(ctx, c)
Expand Down
99 changes: 76 additions & 23 deletions pkg/diagnostic/kms/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package kms
import (
"context"
"fmt"
net "net/http"
"io"
"net/http"
"strings"
"time"

"github.com/cloudzero/cloudzero-agent-validator/pkg/config"
"github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic"
"github.com/cloudzero/cloudzero-agent-validator/pkg/http"
"github.com/cloudzero/cloudzero-agent-validator/pkg/logging"
"github.com/cloudzero/cloudzero-agent-validator/pkg/status"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const DiagnosticKMS = config.DiagnosticKMS
Expand All @@ -23,46 +27,95 @@ var (
)

type checker struct {
cfg *config.Settings
logger *logrus.Entry
cfg *config.Settings
logger *logrus.Entry
clientset kubernetes.Interface
}

func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider {
var NewProvider = func(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider {
var cs kubernetes.Interface
if len(clientset) > 0 {
cs = clientset[0]
} else {
// Use the in-cluster config if running inside a cluster, otherwise use the default kubeconfig
config, err := rest.InClusterConfig()
if err != nil {
kubeconfig := clientcmd.RecommendedHomeFile
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
}

// Create the clientset
cs, err = kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
}

return &checker{
cfg: cfg,
logger: logging.NewLogger().
WithContext(ctx).WithField(logging.OpField, "ksm"),
clientset: cs,
}
}

func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error {
func (c *checker) Check(_ context.Context, client *http.Client, accessor status.Accessor) error {
var (
err error
retriesRemaining = MaxRetry
url = fmt.Sprintf("%s/", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint)
endpointURL = fmt.Sprintf("%s/metrics", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint)
)

// We need to build in a retry here because the kube-state-metrics service can take a few seconds to start up
// If it is deploying with the cloudzero-agent chart
for {
_, err = http.Do(ctx, client, net.MethodGet, nil, nil, url, nil)
if err == nil {
break
c.logger.Infof("Using endpoint URL: %s", endpointURL)

// Retry logic to handle transient issues
for attempt := 1; retriesRemaining > 0; attempt++ {
resp, err := client.Get(endpointURL)
if err != nil {
c.logger.Errorf("Failed to fetch metrics on attempt %d: %v", attempt, err)
retriesRemaining--
time.Sleep(RetryInterval)
continue
}

if resp.StatusCode != http.StatusOK {
c.logger.Errorf("Unexpected status code on attempt %d: %d", attempt, resp.StatusCode)
retriesRemaining--
time.Sleep(RetryInterval)
continue
}
if retriesRemaining == 0 {
break

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
c.logger.Errorf("Failed to read metrics on attempt %d: %v", attempt, err)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())})
return nil
}

metrics := string(body)
allMetricsFound := true
for _, metric := range c.cfg.Prometheus.KubeMetrics {
if strings.Contains(metrics, metric) {
c.logger.Infof("Found required metric %s on attempt %d", metric, attempt)
} else {
c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)})
allMetricsFound = false
}
}

if allMetricsFound {
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true})
return nil
}

retriesRemaining--
time.Sleep(RetryInterval)
}

if err != nil {
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: err.Error()})
return nil
}

accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true})
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)})
return nil

}
Loading

0 comments on commit f62ae72

Please sign in to comment.