diff --git a/pkg/config/prometheus.go b/pkg/config/prometheus.go index 1a39192..483157a 100644 --- a/pkg/config/prometheus.go +++ b/pkg/config/prometheus.go @@ -11,6 +11,7 @@ type Prometheus struct { Executable string `yaml:"executable" default:"/bin/prometheus" env:"PROMETHEUS_EXECUTABLE" env-description:"Prometheus Executable Path"` KubeStateMetricsServiceEndpoint string `yaml:"kube_state_metrics_service_endpoint" env:"KMS_EP_URL" required:"true" env-description:"Kube State Metrics Service Endpoint"` Configurations []string `yaml:"configurations"` + KubeMetrics []string `yaml:"kube_metrics"` } func (s *Prometheus) Validate() error { @@ -43,5 +44,10 @@ func (s *Prometheus) Validate() error { } s.Configurations = cleanedPaths } + + if len(s.KubeMetrics) == 0 { + return errors.New("no KubeMetrics provided") + } + return nil } diff --git a/pkg/config/prometheus_test.go b/pkg/config/prometheus_test.go index efabb40..b809887 100644 --- a/pkg/config/prometheus_test.go +++ b/pkg/config/prometheus_test.go @@ -23,6 +23,7 @@ func TestPrometheus_Validate(t *testing.T) { prom: config.Prometheus{ KubeStateMetricsServiceEndpoint: kmsServiceEndpoint, Configurations: []string{scrapeConfigFile}, + KubeMetrics: []string{"kube_node_info", "kube_pod_info"}, }, expected: nil, }, @@ -30,6 +31,7 @@ func TestPrometheus_Validate(t *testing.T) { name: "MissingKubeStateMetricsServiceEndpoint", prom: config.Prometheus{ Configurations: []string{scrapeConfigFile}, + KubeMetrics: []string{"kube_node_info", "kube_pod_info"}, }, expected: errors.New(config.ErrNoKubeStateMetricsServiceEndpointMsg), }, @@ -37,9 +39,18 @@ func TestPrometheus_Validate(t *testing.T) { name: "MissingScrapeConfigLocation", prom: config.Prometheus{ KubeStateMetricsServiceEndpoint: kmsServiceEndpoint, + KubeMetrics: []string{"kube_node_info", "kube_pod_info"}, }, expected: nil, }, + { + name: "MissingKubeMetrics", + prom: config.Prometheus{ + KubeStateMetricsServiceEndpoint: kmsServiceEndpoint, + Configurations: []string{scrapeConfigFile}, + }, + expected: errors.New("no KubeMetrics provided"), + }, } for _, tt := range tests { diff --git a/pkg/config/testdata/cloudzero-agent-validator.yml b/pkg/config/testdata/cloudzero-agent-validator.yml index 26053b3..181323f 100644 --- a/pkg/config/testdata/cloudzero-agent-validator.yml +++ b/pkg/config/testdata/cloudzero-agent-validator.yml @@ -18,6 +18,14 @@ cloudzero: prometheus: kube_state_metrics_service_endpoint: http://kube-state-metrics:8080 + kube_metrics: + - kube_node_info + - kube_node_status_capacity + - kube_pod_container_resource_limits + - kube_pod_container_resource_requests + - kube_pod_labels + - kube_pod_info + - node_dmi_info configurations: - prometheus.yml diff --git a/pkg/diagnostic/catalog/catalog_test.go b/pkg/diagnostic/catalog/catalog_test.go index 9e32e9e..748f6a6 100644 --- a/pkg/diagnostic/catalog/catalog_test.go +++ b/pkg/diagnostic/catalog/catalog_test.go @@ -2,14 +2,34 @@ package catalog_test import ( "context" + "net/http" "testing" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/catalog" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/kms" + "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" ) +type mockProvider struct{} + +func (m *mockProvider) Check(ctx context.Context, client *http.Client, recorder status.Accessor) error { + return nil +} + +func mockKMSProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { + return &mockProvider{} +} + func TestRegistry_Get(t *testing.T) { + // Override kms.NewProvider + originalNewProvider := kms.NewProvider + kms.NewProvider = mockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + ctx := context.Background() c := &config.Settings{} r := catalog.NewCatalog(ctx, c) @@ -28,6 +48,11 @@ func TestRegistry_Get(t *testing.T) { } func TestRegistry_Has(t *testing.T) { + // Override kms.NewProvider + originalNewProvider := kms.NewProvider + kms.NewProvider = mockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + ctx := context.Background() c := &config.Settings{} r := catalog.NewCatalog(ctx, c) @@ -42,6 +67,11 @@ func TestRegistry_Has(t *testing.T) { } func TestRegistry_List(t *testing.T) { + // Override kms.NewProvider + originalNewProvider := kms.NewProvider + kms.NewProvider = mockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + ctx := context.Background() c := &config.Settings{} r := catalog.NewCatalog(ctx, c) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 255ea3c..c6ac5ca 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -3,15 +3,19 @@ package kms import ( "context" "fmt" - net "net/http" + "io" + "net/http" + "strings" "time" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic" - "github.com/cloudzero/cloudzero-agent-validator/pkg/http" "github.com/cloudzero/cloudzero-agent-validator/pkg/logging" "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) const DiagnosticKMS = config.DiagnosticKMS @@ -23,46 +27,95 @@ var ( ) type checker struct { - cfg *config.Settings - logger *logrus.Entry + cfg *config.Settings + logger *logrus.Entry + clientset kubernetes.Interface } -func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider { +var NewProvider = func(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { + var cs kubernetes.Interface + if len(clientset) > 0 { + cs = clientset[0] + } else { + // Use the in-cluster config if running inside a cluster, otherwise use the default kubeconfig + config, err := rest.InClusterConfig() + if err != nil { + kubeconfig := clientcmd.RecommendedHomeFile + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + panic(err.Error()) + } + } + + // Create the clientset + cs, err = kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + } + return &checker{ cfg: cfg, logger: logging.NewLogger(). WithContext(ctx).WithField(logging.OpField, "ksm"), + clientset: cs, } } -func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error { +func (c *checker) Check(_ context.Context, client *http.Client, accessor status.Accessor) error { var ( - err error retriesRemaining = MaxRetry - url = fmt.Sprintf("%s/", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint) + endpointURL = fmt.Sprintf("%s/metrics", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint) ) - // We need to build in a retry here because the kube-state-metrics service can take a few seconds to start up - // If it is deploying with the cloudzero-agent chart - for { - _, err = http.Do(ctx, client, net.MethodGet, nil, nil, url, nil) - if err == nil { - break + c.logger.Infof("Using endpoint URL: %s", endpointURL) + + // Retry logic to handle transient issues + for attempt := 1; retriesRemaining > 0; attempt++ { + resp, err := client.Get(endpointURL) + if err != nil { + c.logger.Errorf("Failed to fetch metrics on attempt %d: %v", attempt, err) + retriesRemaining-- + time.Sleep(RetryInterval) + continue + } + + if resp.StatusCode != http.StatusOK { + c.logger.Errorf("Unexpected status code on attempt %d: %d", attempt, resp.StatusCode) + retriesRemaining-- + time.Sleep(RetryInterval) + continue } - if retriesRemaining == 0 { - break + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + c.logger.Errorf("Failed to read metrics on attempt %d: %v", attempt, err) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())}) + return nil + } + + metrics := string(body) + allMetricsFound := true + for _, metric := range c.cfg.Prometheus.KubeMetrics { + if strings.Contains(metrics, metric) { + c.logger.Infof("Found required metric %s on attempt %d", metric, attempt) + } else { + c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) + allMetricsFound = false + } + } + + if allMetricsFound { + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) + return nil } retriesRemaining-- time.Sleep(RetryInterval) } - if err != nil { - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: err.Error()}) - return nil - } - - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)}) return nil - } diff --git a/pkg/diagnostic/kms/check_test.go b/pkg/diagnostic/kms/check_test.go index dcb1573..719bfc9 100644 --- a/pkg/diagnostic/kms/check_test.go +++ b/pkg/diagnostic/kms/check_test.go @@ -3,10 +3,16 @@ package kms_test import ( "context" "net/http" + "strings" "testing" "time" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/kms" @@ -22,16 +28,41 @@ func makeReport() status.Accessor { return status.NewAccessor(&status.ClusterStatus{}) } +// createMockEndpoints creates mock endpoints and adds them to the fake clientset +func createMockEndpoints(clientset *fake.Clientset) { + clientset.PrependReactor("get", "endpoints", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cz-prom-agent-kube-state-metrics", + Namespace: "prom-agent", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "192.168.1.1"}, + }, + Ports: []corev1.EndpointPort{ + {Name: "http", Port: 8080}, + }, + }, + }, + }, nil + }) +} + func TestChecker_CheckOK(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } - provider := kms.NewProvider(context.Background(), cfg) + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) mock := test.NewHTTPMock() - mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil) + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) client := mock.HTTPClient() accessor := makeReport() @@ -51,17 +82,22 @@ func TestChecker_CheckRetry(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } - provider := kms.NewProvider(context.Background(), cfg) + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) - // Update the test sleep interval to accellerate the test + // Update the test sleep interval to accelerate the test kms.RetryInterval = 10 * time.Millisecond + kms.MaxRetry = 3 + mock := test.NewHTTPMock() - for i := 0; i < kms.MaxRetry; i++ { + for i := 0; i < kms.MaxRetry-1; i++ { mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) } - mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil) + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) client := mock.HTTPClient() accessor := makeReport() @@ -81,16 +117,21 @@ func TestChecker_CheckRetryFailure(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } - provider := kms.NewProvider(context.Background(), cfg) + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) - // Update the test sleep interval to accellerate the test + // Update the test sleep interval to accelerate the test kms.RetryInterval = 10 * time.Millisecond - kms.MaxRetry = 0 + kms.MaxRetry = 3 mock := test.NewHTTPMock() - mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) + for i := 0; i < kms.MaxRetry; i++ { + mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) + } client := mock.HTTPClient() accessor := makeReport() @@ -105,3 +146,98 @@ func TestChecker_CheckRetryFailure(t *testing.T) { } }) } + +func TestChecker_CheckMetricsValidation(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, + }, + } + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) + + mock := test.NewHTTPMock() + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 1) + for _, c := range s.Checks { + assert.True(t, c.Passing) + } + }) +} + +func TestChecker_CheckHandles500Error(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, + }, + } + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) + + mock := test.NewHTTPMock() + mock.Expect(http.MethodGet, "", http.StatusInternalServerError, nil) + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 1) + for _, c := range s.Checks { + assert.False(t, c.Passing) + } + }) +} + +func TestChecker_CheckMissingMetrics(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info", "missing_metric"}, + }, + } + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) + + mock := test.NewHTTPMock() + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 2) + foundMissingMetricError := false + foundRetryError := false + for _, c := range s.Checks { + t.Logf("Check: %+v", c) + assert.False(t, c.Passing) + if strings.Contains(c.Error, "Required metric missing_metric not found") { + foundMissingMetricError = true + } + if strings.Contains(c.Error, "Failed to fetch metrics after 3 retries") { + foundRetryError = true + } + } + assert.True(t, foundMissingMetricError, "Expected error for missing metric not found") + assert.True(t, foundRetryError, "Expected error for failed retries not found") + }) +} diff --git a/pkg/diagnostic/runner/runner_test.go b/pkg/diagnostic/runner/runner_test.go index 9908945..73554fa 100644 --- a/pkg/diagnostic/runner/runner_test.go +++ b/pkg/diagnostic/runner/runner_test.go @@ -7,9 +7,12 @@ import ( "testing" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/catalog" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/kms" "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" ) type mockProvider struct { @@ -23,6 +26,15 @@ func (m *mockProvider) Check(ctx context.Context, client *http.Client, recorder return nil } +func NewMockKMSProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { + return &mockProvider{ + Test: func(ctx context.Context, client *http.Client, recorder status.Accessor) error { + // Simulate a successful check + return nil + }, + } +} + func TestRunner_Run_Error(t *testing.T) { cfg := &config.Settings{ Deployment: config.Deployment{ @@ -32,6 +44,11 @@ func TestRunner_Run_Error(t *testing.T) { }, } + // Use the mock provider for KMS + originalNewProvider := kms.NewProvider + kms.NewProvider = NewMockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + reg := catalog.NewCatalog(context.Background(), cfg) stage := config.ContextStageInit @@ -65,6 +82,11 @@ func TestRunner_Run(t *testing.T) { }, } + // Use the mock provider for KMS + originalNewProvider := kms.NewProvider + kms.NewProvider = NewMockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + reg := catalog.NewCatalog(context.Background(), cfg) stage := config.ContextStageInit