From 82476ed3f9a87718a07b5ef0eaa49b8ff8a1e3c9 Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Fri, 22 Nov 2024 10:14:56 -0800 Subject: [PATCH 01/11] Modify KSM to check metrics --- pkg/diagnostic/kms/check.go | 127 ++++++++++++++++++++++++++----- pkg/diagnostic/kms/check_test.go | 111 ++++++++++++++++++++++++--- 2 files changed, 209 insertions(+), 29 deletions(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 255ea3c..89532af 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -3,15 +3,20 @@ package kms import ( "context" "fmt" - net "net/http" + "io/ioutil" + "net/http" + "strings" "time" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic" - "github.com/cloudzero/cloudzero-agent-validator/pkg/http" "github.com/cloudzero/cloudzero-agent-validator/pkg/logging" "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) const DiagnosticKMS = config.DiagnosticKMS @@ -23,46 +28,130 @@ var ( ) type checker struct { - cfg *config.Settings - logger *logrus.Entry + cfg *config.Settings + logger *logrus.Entry + clientset kubernetes.Interface } -func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider { +func NewProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { + var cs kubernetes.Interface + if len(clientset) > 0 { + cs = clientset[0] + } else { + // Use the in-cluster config if running inside a cluster, otherwise use the default kubeconfig + config, err := rest.InClusterConfig() + if err != nil { + kubeconfig := clientcmd.RecommendedHomeFile + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + panic(err.Error()) + } + } + + // Create the clientset + cs, err = kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + } + return &checker{ cfg: cfg, logger: logging.NewLogger(). WithContext(ctx).WithField(logging.OpField, "ksm"), + clientset: cs, } } -func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error { +func (c *checker) Check(ctx context.Context, client *http.Client, accessor status.Accessor) error { var ( - err error retriesRemaining = MaxRetry - url = fmt.Sprintf("%s/", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint) + namespace = "prom-agent" + serviceName = "cz-prom-agent-kube-state-metrics" + endpointURL string ) - // We need to build in a retry here because the kube-state-metrics service can take a few seconds to start up - // If it is deploying with the cloudzero-agent chart - for { - _, err = http.Do(ctx, client, net.MethodGet, nil, nil, url, nil) - if err == nil { - break + // Wait for the pod to become ready and find the first available endpoint + for retriesRemaining > 0 { + endpoints, err := c.clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) + if err != nil { + c.logger.Errorf("Failed to get service endpoints: %v", err) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to get service endpoints: %s", err.Error())}) + return nil } - if retriesRemaining == 0 { + + // Log the endpoints for debugging + c.logger.Infof("Endpoints: %v", endpoints) + + // Check if there are any ready addresses and find the first available endpoint + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + c.logger.Infof("Address: %v", address) + for _, port := range subset.Ports { + c.logger.Infof("Port: %v", port) + if port.Port == 8080 { + endpointURL = fmt.Sprintf("http://%s:%d/metrics", address.IP, port.Port) + break + } + } + if endpointURL != "" { + break + } + } + if endpointURL != "" { + break + } + } + + if endpointURL != "" { break } + c.logger.Infof("Pod is not ready, waiting...") retriesRemaining-- time.Sleep(RetryInterval) } - if err != nil { - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: err.Error()}) + if retriesRemaining == 0 { + c.logger.Errorf("Pod did not become ready in time") + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: "Pod did not become ready in time"}) return nil } - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) - return nil + c.logger.Infof("Using endpoint URL: %s", endpointURL) + + // Retry logic to handle transient issues + retriesRemaining = MaxRetry + for retriesRemaining > 0 { + resp, err := client.Get(endpointURL) + if err == nil && resp.StatusCode == http.StatusOK { + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + c.logger.Errorf("Failed to read metrics: %v", err) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())}) + return nil + } + + metrics := string(body) + requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here + for _, metric := range requiredMetrics { + if !strings.Contains(metrics, metric) { + c.logger.Errorf("Required metric %s not found", metric) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) + return nil + } + } + + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) + return nil + } + c.logger.Errorf("Failed to fetch metrics: %v", err) + retriesRemaining-- + time.Sleep(RetryInterval) + } + + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)}) + return nil } diff --git a/pkg/diagnostic/kms/check_test.go b/pkg/diagnostic/kms/check_test.go index dcb1573..ae8be6a 100644 --- a/pkg/diagnostic/kms/check_test.go +++ b/pkg/diagnostic/kms/check_test.go @@ -7,6 +7,11 @@ import ( "time" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/kms" @@ -22,16 +27,40 @@ func makeReport() status.Accessor { return status.NewAccessor(&status.ClusterStatus{}) } +// createMockEndpoints creates mock endpoints and adds them to the fake clientset +func createMockEndpoints(clientset *fake.Clientset) { + clientset.PrependReactor("get", "endpoints", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cz-prom-agent-kube-state-metrics", + Namespace: "prom-agent", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "192.168.1.1"}, + }, + Ports: []corev1.EndpointPort{ + {Name: "http", Port: 8080}, + }, + }, + }, + }, nil + }) +} + func TestChecker_CheckOK(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, }, } - provider := kms.NewProvider(context.Background(), cfg) + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) mock := test.NewHTTPMock() - mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil) + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) client := mock.HTTPClient() accessor := makeReport() @@ -53,15 +82,19 @@ func TestChecker_CheckRetry(t *testing.T) { KubeStateMetricsServiceEndpoint: mockURL, }, } - provider := kms.NewProvider(context.Background(), cfg) + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) - // Update the test sleep interval to accellerate the test + // Update the test sleep interval to accelerate the test kms.RetryInterval = 10 * time.Millisecond + kms.MaxRetry = 3 + mock := test.NewHTTPMock() - for i := 0; i < kms.MaxRetry; i++ { + for i := 0; i < kms.MaxRetry-1; i++ { mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) } - mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil) + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) client := mock.HTTPClient() accessor := makeReport() @@ -83,14 +116,72 @@ func TestChecker_CheckRetryFailure(t *testing.T) { KubeStateMetricsServiceEndpoint: mockURL, }, } - provider := kms.NewProvider(context.Background(), cfg) + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) - // Update the test sleep interval to accellerate the test + // Update the test sleep interval to accelerate the test kms.RetryInterval = 10 * time.Millisecond - kms.MaxRetry = 0 + kms.MaxRetry = 3 + + mock := test.NewHTTPMock() + for i := 0; i < kms.MaxRetry; i++ { + mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) + } + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 1) + for _, c := range s.Checks { + assert.False(t, c.Passing) + } + }) +} + +func TestChecker_CheckMetricsValidation(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + }, + } + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) + + mock := test.NewHTTPMock() + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 1) + for _, c := range s.Checks { + assert.True(t, c.Passing) + } + }) +} + +func TestChecker_CheckHandles500Error(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + }, + } + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) mock := test.NewHTTPMock() - mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) + mock.Expect(http.MethodGet, "", http.StatusInternalServerError, nil) client := mock.HTTPClient() accessor := makeReport() From 87dadb34178e3121abdf41c7c1e79390967ec35d Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Wed, 27 Nov 2024 12:18:03 -0800 Subject: [PATCH 02/11] fix runner tests --- pkg/diagnostic/kms/check.go | 2 +- pkg/diagnostic/runner/runner_test.go | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 89532af..8e95ba9 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -33,7 +33,7 @@ type checker struct { clientset kubernetes.Interface } -func NewProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { +var NewProvider = func(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { var cs kubernetes.Interface if len(clientset) > 0 { cs = clientset[0] diff --git a/pkg/diagnostic/runner/runner_test.go b/pkg/diagnostic/runner/runner_test.go index 9908945..73554fa 100644 --- a/pkg/diagnostic/runner/runner_test.go +++ b/pkg/diagnostic/runner/runner_test.go @@ -7,9 +7,12 @@ import ( "testing" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/catalog" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/kms" "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" ) type mockProvider struct { @@ -23,6 +26,15 @@ func (m *mockProvider) Check(ctx context.Context, client *http.Client, recorder return nil } +func NewMockKMSProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { + return &mockProvider{ + Test: func(ctx context.Context, client *http.Client, recorder status.Accessor) error { + // Simulate a successful check + return nil + }, + } +} + func TestRunner_Run_Error(t *testing.T) { cfg := &config.Settings{ Deployment: config.Deployment{ @@ -32,6 +44,11 @@ func TestRunner_Run_Error(t *testing.T) { }, } + // Use the mock provider for KMS + originalNewProvider := kms.NewProvider + kms.NewProvider = NewMockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + reg := catalog.NewCatalog(context.Background(), cfg) stage := config.ContextStageInit @@ -65,6 +82,11 @@ func TestRunner_Run(t *testing.T) { }, } + // Use the mock provider for KMS + originalNewProvider := kms.NewProvider + kms.NewProvider = NewMockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + reg := catalog.NewCatalog(context.Background(), cfg) stage := config.ContextStageInit From 6d789029c28e617a6a1628ccfb6e072e8c02f5b5 Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Wed, 27 Nov 2024 12:24:53 -0800 Subject: [PATCH 03/11] fix catalog tests --- pkg/diagnostic/catalog/catalog_test.go | 30 ++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/diagnostic/catalog/catalog_test.go b/pkg/diagnostic/catalog/catalog_test.go index 9e32e9e..748f6a6 100644 --- a/pkg/diagnostic/catalog/catalog_test.go +++ b/pkg/diagnostic/catalog/catalog_test.go @@ -2,14 +2,34 @@ package catalog_test import ( "context" + "net/http" "testing" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/catalog" + "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic/kms" + "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" ) +type mockProvider struct{} + +func (m *mockProvider) Check(ctx context.Context, client *http.Client, recorder status.Accessor) error { + return nil +} + +func mockKMSProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider { + return &mockProvider{} +} + func TestRegistry_Get(t *testing.T) { + // Override kms.NewProvider + originalNewProvider := kms.NewProvider + kms.NewProvider = mockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + ctx := context.Background() c := &config.Settings{} r := catalog.NewCatalog(ctx, c) @@ -28,6 +48,11 @@ func TestRegistry_Get(t *testing.T) { } func TestRegistry_Has(t *testing.T) { + // Override kms.NewProvider + originalNewProvider := kms.NewProvider + kms.NewProvider = mockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + ctx := context.Background() c := &config.Settings{} r := catalog.NewCatalog(ctx, c) @@ -42,6 +67,11 @@ func TestRegistry_Has(t *testing.T) { } func TestRegistry_List(t *testing.T) { + // Override kms.NewProvider + originalNewProvider := kms.NewProvider + kms.NewProvider = mockKMSProvider + defer func() { kms.NewProvider = originalNewProvider }() + ctx := context.Background() c := &config.Settings{} r := catalog.NewCatalog(ctx, c) From 558dbb1956f0e5b7a4e5f3bd2f625c42b7440959 Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Wed, 27 Nov 2024 12:34:50 -0800 Subject: [PATCH 04/11] linting --- pkg/diagnostic/kms/check.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 8e95ba9..267e67d 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -3,7 +3,7 @@ package kms import ( "context" "fmt" - "io/ioutil" + "io" "net/http" "strings" "time" @@ -126,7 +126,7 @@ func (c *checker) Check(ctx context.Context, client *http.Client, accessor statu resp, err := client.Get(endpointURL) if err == nil && resp.StatusCode == http.StatusOK { defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { c.logger.Errorf("Failed to read metrics: %v", err) accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())}) From ab8dc5bea64257fb668edbc2365767fcd7650bb7 Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Mon, 2 Dec 2024 14:58:24 -0800 Subject: [PATCH 05/11] better logging and use ksm internal endpoint --- pkg/diagnostic/kms/check.go | 61 +++---------------------------------- 1 file changed, 5 insertions(+), 56 deletions(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 267e67d..83b0de9 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -13,7 +13,6 @@ import ( "github.com/cloudzero/cloudzero-agent-validator/pkg/logging" "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -66,69 +65,19 @@ var NewProvider = func(ctx context.Context, cfg *config.Settings, clientset ...k func (c *checker) Check(ctx context.Context, client *http.Client, accessor status.Accessor) error { var ( retriesRemaining = MaxRetry - namespace = "prom-agent" - serviceName = "cz-prom-agent-kube-state-metrics" - endpointURL string + endpointURL = fmt.Sprintf("%s/metrics", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint) ) - // Wait for the pod to become ready and find the first available endpoint - for retriesRemaining > 0 { - endpoints, err := c.clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) - if err != nil { - c.logger.Errorf("Failed to get service endpoints: %v", err) - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to get service endpoints: %s", err.Error())}) - return nil - } - - // Log the endpoints for debugging - c.logger.Infof("Endpoints: %v", endpoints) - - // Check if there are any ready addresses and find the first available endpoint - for _, subset := range endpoints.Subsets { - for _, address := range subset.Addresses { - c.logger.Infof("Address: %v", address) - for _, port := range subset.Ports { - c.logger.Infof("Port: %v", port) - if port.Port == 8080 { - endpointURL = fmt.Sprintf("http://%s:%d/metrics", address.IP, port.Port) - break - } - } - if endpointURL != "" { - break - } - } - if endpointURL != "" { - break - } - } - - if endpointURL != "" { - break - } - - c.logger.Infof("Pod is not ready, waiting...") - retriesRemaining-- - time.Sleep(RetryInterval) - } - - if retriesRemaining == 0 { - c.logger.Errorf("Pod did not become ready in time") - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: "Pod did not become ready in time"}) - return nil - } - c.logger.Infof("Using endpoint URL: %s", endpointURL) // Retry logic to handle transient issues - retriesRemaining = MaxRetry - for retriesRemaining > 0 { + for attempt := 1; retriesRemaining > 0; attempt++ { resp, err := client.Get(endpointURL) if err == nil && resp.StatusCode == http.StatusOK { defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - c.logger.Errorf("Failed to read metrics: %v", err) + c.logger.Errorf("Failed to read metrics on attempt %d: %v", attempt, err) accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())}) return nil } @@ -137,7 +86,7 @@ func (c *checker) Check(ctx context.Context, client *http.Client, accessor statu requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here for _, metric := range requiredMetrics { if !strings.Contains(metrics, metric) { - c.logger.Errorf("Required metric %s not found", metric) + c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt) accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) return nil } @@ -147,7 +96,7 @@ func (c *checker) Check(ctx context.Context, client *http.Client, accessor statu return nil } - c.logger.Errorf("Failed to fetch metrics: %v", err) + c.logger.Errorf("Failed to fetch metrics on attempt %d: %v", attempt, err) retriesRemaining-- time.Sleep(RetryInterval) } From b0f6196efaf1d160a69621920ce7bb3606552742 Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Mon, 2 Dec 2024 15:33:37 -0800 Subject: [PATCH 06/11] improve readability --- pkg/diagnostic/kms/check.go | 50 +++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 83b0de9..8d787ac 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -73,32 +73,40 @@ func (c *checker) Check(ctx context.Context, client *http.Client, accessor statu // Retry logic to handle transient issues for attempt := 1; retriesRemaining > 0; attempt++ { resp, err := client.Get(endpointURL) - if err == nil && resp.StatusCode == http.StatusOK { - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - c.logger.Errorf("Failed to read metrics on attempt %d: %v", attempt, err) - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())}) - return nil - } + if err != nil { + c.logger.Errorf("Failed to fetch metrics on attempt %d: %v", attempt, err) + retriesRemaining-- + time.Sleep(RetryInterval) + continue + } - metrics := string(body) - requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here - for _, metric := range requiredMetrics { - if !strings.Contains(metrics, metric) { - c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt) - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) - return nil - } - } + if resp.StatusCode != http.StatusOK { + c.logger.Errorf("Unexpected status code on attempt %d: %d", attempt, resp.StatusCode) + retriesRemaining-- + time.Sleep(RetryInterval) + continue + } - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + c.logger.Errorf("Failed to read metrics on attempt %d: %v", attempt, err) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())}) return nil } - c.logger.Errorf("Failed to fetch metrics on attempt %d: %v", attempt, err) - retriesRemaining-- - time.Sleep(RetryInterval) + metrics := string(body) + requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here + for _, metric := range requiredMetrics { + if !strings.Contains(metrics, metric) { + c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) + return nil + } + } + + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) + return nil } accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)}) From dc3936ea908d581f08994157a69eb393f8528fff Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Mon, 2 Dec 2024 15:38:22 -0800 Subject: [PATCH 07/11] linting --- pkg/diagnostic/kms/check.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 8d787ac..6a31077 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -62,7 +62,7 @@ var NewProvider = func(ctx context.Context, cfg *config.Settings, clientset ...k } } -func (c *checker) Check(ctx context.Context, client *http.Client, accessor status.Accessor) error { +func (c *checker) Check(_ context.Context, client *http.Client, accessor status.Accessor) error { var ( retriesRemaining = MaxRetry endpointURL = fmt.Sprintf("%s/metrics", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint) From 14b87d49120507fa138c7085e446ee64d2d01b7f Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Tue, 3 Dec 2024 12:23:39 -0800 Subject: [PATCH 08/11] add support for full kube metrics list --- pkg/config/prometheus.go | 6 ++++++ pkg/config/prometheus_test.go | 11 +++++++++++ pkg/config/testdata/cloudzero-agent-validator.yml | 8 ++++++++ 3 files changed, 25 insertions(+) diff --git a/pkg/config/prometheus.go b/pkg/config/prometheus.go index 1a39192..483157a 100644 --- a/pkg/config/prometheus.go +++ b/pkg/config/prometheus.go @@ -11,6 +11,7 @@ type Prometheus struct { Executable string `yaml:"executable" default:"/bin/prometheus" env:"PROMETHEUS_EXECUTABLE" env-description:"Prometheus Executable Path"` KubeStateMetricsServiceEndpoint string `yaml:"kube_state_metrics_service_endpoint" env:"KMS_EP_URL" required:"true" env-description:"Kube State Metrics Service Endpoint"` Configurations []string `yaml:"configurations"` + KubeMetrics []string `yaml:"kube_metrics"` } func (s *Prometheus) Validate() error { @@ -43,5 +44,10 @@ func (s *Prometheus) Validate() error { } s.Configurations = cleanedPaths } + + if len(s.KubeMetrics) == 0 { + return errors.New("no KubeMetrics provided") + } + return nil } diff --git a/pkg/config/prometheus_test.go b/pkg/config/prometheus_test.go index efabb40..b809887 100644 --- a/pkg/config/prometheus_test.go +++ b/pkg/config/prometheus_test.go @@ -23,6 +23,7 @@ func TestPrometheus_Validate(t *testing.T) { prom: config.Prometheus{ KubeStateMetricsServiceEndpoint: kmsServiceEndpoint, Configurations: []string{scrapeConfigFile}, + KubeMetrics: []string{"kube_node_info", "kube_pod_info"}, }, expected: nil, }, @@ -30,6 +31,7 @@ func TestPrometheus_Validate(t *testing.T) { name: "MissingKubeStateMetricsServiceEndpoint", prom: config.Prometheus{ Configurations: []string{scrapeConfigFile}, + KubeMetrics: []string{"kube_node_info", "kube_pod_info"}, }, expected: errors.New(config.ErrNoKubeStateMetricsServiceEndpointMsg), }, @@ -37,9 +39,18 @@ func TestPrometheus_Validate(t *testing.T) { name: "MissingScrapeConfigLocation", prom: config.Prometheus{ KubeStateMetricsServiceEndpoint: kmsServiceEndpoint, + KubeMetrics: []string{"kube_node_info", "kube_pod_info"}, }, expected: nil, }, + { + name: "MissingKubeMetrics", + prom: config.Prometheus{ + KubeStateMetricsServiceEndpoint: kmsServiceEndpoint, + Configurations: []string{scrapeConfigFile}, + }, + expected: errors.New("no KubeMetrics provided"), + }, } for _, tt := range tests { diff --git a/pkg/config/testdata/cloudzero-agent-validator.yml b/pkg/config/testdata/cloudzero-agent-validator.yml index 26053b3..181323f 100644 --- a/pkg/config/testdata/cloudzero-agent-validator.yml +++ b/pkg/config/testdata/cloudzero-agent-validator.yml @@ -18,6 +18,14 @@ cloudzero: prometheus: kube_state_metrics_service_endpoint: http://kube-state-metrics:8080 + kube_metrics: + - kube_node_info + - kube_node_status_capacity + - kube_pod_container_resource_limits + - kube_pod_container_resource_requests + - kube_pod_labels + - kube_pod_info + - node_dmi_info configurations: - prometheus.yml From e68ec94763e85ee696fa34e48d63c180ea237fd1 Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Tue, 3 Dec 2024 12:29:44 -0800 Subject: [PATCH 09/11] use metric list during check --- pkg/diagnostic/kms/check.go | 2 +- pkg/diagnostic/kms/check_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 6a31077..d7e2826 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -96,7 +96,7 @@ func (c *checker) Check(_ context.Context, client *http.Client, accessor status. } metrics := string(body) - requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here + requiredMetrics := c.cfg.Prometheus.KubeMetrics // Use the required metrics from the configuration for _, metric := range requiredMetrics { if !strings.Contains(metrics, metric) { c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt) diff --git a/pkg/diagnostic/kms/check_test.go b/pkg/diagnostic/kms/check_test.go index ae8be6a..2ca6bc1 100644 --- a/pkg/diagnostic/kms/check_test.go +++ b/pkg/diagnostic/kms/check_test.go @@ -53,6 +53,7 @@ func TestChecker_CheckOK(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } clientset := fake.NewSimpleClientset() @@ -80,6 +81,7 @@ func TestChecker_CheckRetry(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } clientset := fake.NewSimpleClientset() @@ -114,6 +116,7 @@ func TestChecker_CheckRetryFailure(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } clientset := fake.NewSimpleClientset() @@ -147,6 +150,7 @@ func TestChecker_CheckMetricsValidation(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } clientset := fake.NewSimpleClientset() @@ -174,6 +178,7 @@ func TestChecker_CheckHandles500Error(t *testing.T) { cfg := &config.Settings{ Prometheus: config.Prometheus{ KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info"}, }, } clientset := fake.NewSimpleClientset() From 59de324ea9e4488dd12280a3cb5a4575933b5336 Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Tue, 3 Dec 2024 13:42:37 -0800 Subject: [PATCH 10/11] better handling of missing metrics --- pkg/diagnostic/kms/check.go | 15 ++++++++---- pkg/diagnostic/kms/check_test.go | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index d7e2826..f1580c3 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -96,17 +96,22 @@ func (c *checker) Check(_ context.Context, client *http.Client, accessor status. } metrics := string(body) - requiredMetrics := c.cfg.Prometheus.KubeMetrics // Use the required metrics from the configuration - for _, metric := range requiredMetrics { + allMetricsFound := true + for _, metric := range c.cfg.Prometheus.KubeMetrics { if !strings.Contains(metrics, metric) { c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt) accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) - return nil + allMetricsFound = false } } - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) - return nil + if allMetricsFound { + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) + return nil + } + + retriesRemaining-- + time.Sleep(RetryInterval) } accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)}) diff --git a/pkg/diagnostic/kms/check_test.go b/pkg/diagnostic/kms/check_test.go index 2ca6bc1..719bfc9 100644 --- a/pkg/diagnostic/kms/check_test.go +++ b/pkg/diagnostic/kms/check_test.go @@ -3,6 +3,7 @@ package kms_test import ( "context" "net/http" + "strings" "testing" "time" @@ -201,3 +202,42 @@ func TestChecker_CheckHandles500Error(t *testing.T) { } }) } + +func TestChecker_CheckMissingMetrics(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + KubeMetrics: []string{"kube_pod_info", "kube_node_info", "missing_metric"}, + }, + } + clientset := fake.NewSimpleClientset() + createMockEndpoints(clientset) + provider := kms.NewProvider(context.Background(), cfg, clientset) + + mock := test.NewHTTPMock() + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 2) + foundMissingMetricError := false + foundRetryError := false + for _, c := range s.Checks { + t.Logf("Check: %+v", c) + assert.False(t, c.Passing) + if strings.Contains(c.Error, "Required metric missing_metric not found") { + foundMissingMetricError = true + } + if strings.Contains(c.Error, "Failed to fetch metrics after 3 retries") { + foundRetryError = true + } + } + assert.True(t, foundMissingMetricError, "Expected error for missing metric not found") + assert.True(t, foundRetryError, "Expected error for failed retries not found") + }) +} From 6ab217a4fdb48feca3e4fc8e3acb7906a201caca Mon Sep 17 00:00:00 2001 From: Brian Drennan Date: Wed, 4 Dec 2024 10:51:25 -0800 Subject: [PATCH 11/11] add better logging --- pkg/diagnostic/kms/check.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index f1580c3..c6ac5ca 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -98,7 +98,9 @@ func (c *checker) Check(_ context.Context, client *http.Client, accessor status. metrics := string(body) allMetricsFound := true for _, metric := range c.cfg.Prometheus.KubeMetrics { - if !strings.Contains(metrics, metric) { + if strings.Contains(metrics, metric) { + c.logger.Infof("Found required metric %s on attempt %d", metric, attempt) + } else { c.logger.Errorf("Required metric %s not found on attempt %d", metric, attempt) accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) allMetricsFound = false