diff --git a/pkg/diagnostic/kms/check.go b/pkg/diagnostic/kms/check.go index 255ea3c..3b4b3e2 100644 --- a/pkg/diagnostic/kms/check.go +++ b/pkg/diagnostic/kms/check.go @@ -3,18 +3,24 @@ package kms import ( "context" "fmt" + "io/ioutil" net "net/http" + "strings" "time" "github.com/cloudzero/cloudzero-agent-validator/pkg/config" "github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic" - "github.com/cloudzero/cloudzero-agent-validator/pkg/http" "github.com/cloudzero/cloudzero-agent-validator/pkg/logging" "github.com/cloudzero/cloudzero-agent-validator/pkg/status" "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) const DiagnosticKMS = config.DiagnosticKMS +const DiagnosticKMSMetrics = "DiagnosticKMSMetrics" var ( // Exported so that it can be overridden in tests @@ -23,46 +29,126 @@ var ( ) type checker struct { - cfg *config.Settings - logger *logrus.Entry + cfg *config.Settings + logger *logrus.Entry + clientset *kubernetes.Clientset } func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider { + // Use the in-cluster config if running inside a cluster, otherwise use the default kubeconfig + config, err := rest.InClusterConfig() + if err != nil { + kubeconfig := clientcmd.RecommendedHomeFile + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + panic(err.Error()) + } + } + + // Create the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + return &checker{ cfg: cfg, logger: logging.NewLogger(). WithContext(ctx).WithField(logging.OpField, "ksm"), + clientset: clientset, } } func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error { var ( - err error retriesRemaining = MaxRetry - url = fmt.Sprintf("%s/", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint) + namespace = "prom-agent" + serviceName = "cz-prom-agent-kube-state-metrics" + endpointURL string ) - // We need to build in a retry here because the kube-state-metrics service can take a few seconds to start up - // If it is deploying with the cloudzero-agent chart - for { - _, err = http.Do(ctx, client, net.MethodGet, nil, nil, url, nil) - if err == nil { - break + // Wait for the pod to become ready and find the first available endpoint + for retriesRemaining > 0 { + endpoints, err := c.clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) + if err != nil { + c.logger.Errorf("Failed to get service endpoints: %v", err) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to get service endpoints: %s", err.Error())}) + return nil + } + + // Log the endpoints for debugging + c.logger.Infof("Endpoints: %v", endpoints) + + // Check if there are any ready addresses and find the first available endpoint + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + c.logger.Infof("Address: %v", address) + for _, port := range subset.Ports { + c.logger.Infof("Port: %v", port) + if port.Port == 8080 { + endpointURL = fmt.Sprintf("http://%s:%d/metrics", address.IP, port.Port) + break + } + } + if endpointURL != "" { + break + } + } + if endpointURL != "" { + break + } } - if retriesRemaining == 0 { + + if endpointURL != "" { break } + c.logger.Infof("Pod is not ready, waiting...") retriesRemaining-- time.Sleep(RetryInterval) } - if err != nil { - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: err.Error()}) + if retriesRemaining == 0 { + c.logger.Errorf("Pod did not become ready in time") + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: "Pod did not become ready in time"}) return nil } - accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) - return nil + c.logger.Infof("Using endpoint URL: %s", endpointURL) + + // Retry logic to handle transient issues + retriesRemaining = MaxRetry + for retriesRemaining > 0 { + resp, err := client.Get(endpointURL) + if err == nil && resp.StatusCode == net.StatusOK { + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + c.logger.Errorf("Failed to read metrics: %v", err) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())}) + return nil + } + + metrics := string(body) + requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here + for _, metric := range requiredMetrics { + if !strings.Contains(metrics, metric) { + c.logger.Errorf("Required metric %s not found", metric) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)}) + return nil + } + } + + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true}) + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: true}) + return nil + } + c.logger.Errorf("Failed to fetch metrics: %v", err) + retriesRemaining-- + time.Sleep(RetryInterval) + } + + accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)}) + return nil } diff --git a/pkg/diagnostic/kms/check_test.go b/pkg/diagnostic/kms/check_test.go index dcb1573..0c1e868 100644 --- a/pkg/diagnostic/kms/check_test.go +++ b/pkg/diagnostic/kms/check_test.go @@ -31,7 +31,7 @@ func TestChecker_CheckOK(t *testing.T) { provider := kms.NewProvider(context.Background(), cfg) mock := test.NewHTTPMock() - mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil) + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) client := mock.HTTPClient() accessor := makeReport() @@ -40,7 +40,7 @@ func TestChecker_CheckOK(t *testing.T) { assert.NoError(t, err) accessor.ReadFromReport(func(s *status.ClusterStatus) { - assert.Len(t, s.Checks, 1) + assert.Len(t, s.Checks, 2) for _, c := range s.Checks { assert.True(t, c.Passing) } @@ -55,13 +55,15 @@ func TestChecker_CheckRetry(t *testing.T) { } provider := kms.NewProvider(context.Background(), cfg) - // Update the test sleep interval to accellerate the test + // Update the test sleep interval to accelerate the test kms.RetryInterval = 10 * time.Millisecond + kms.MaxRetry = 3 + mock := test.NewHTTPMock() - for i := 0; i < kms.MaxRetry; i++ { + for i := 0; i < kms.MaxRetry-1; i++ { mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) } - mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil) + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) client := mock.HTTPClient() accessor := makeReport() @@ -70,7 +72,7 @@ func TestChecker_CheckRetry(t *testing.T) { assert.NoError(t, err) accessor.ReadFromReport(func(s *status.ClusterStatus) { - assert.Len(t, s.Checks, 1) + assert.Len(t, s.Checks, 2) for _, c := range s.Checks { assert.True(t, c.Passing) } @@ -85,12 +87,39 @@ func TestChecker_CheckRetryFailure(t *testing.T) { } provider := kms.NewProvider(context.Background(), cfg) - // Update the test sleep interval to accellerate the test + // Update the test sleep interval to accelerate the test kms.RetryInterval = 10 * time.Millisecond - kms.MaxRetry = 0 + kms.MaxRetry = 3 + + mock := test.NewHTTPMock() + for i := 0; i < kms.MaxRetry; i++ { + mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) + } + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 1) + for _, c := range s.Checks { + assert.False(t, c.Passing) + } + }) +} + +func TestChecker_CheckMetricsValidation(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + }, + } + provider := kms.NewProvider(context.Background(), cfg) mock := test.NewHTTPMock() - mock.Expect(http.MethodGet, "", http.StatusNotFound, nil) + mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil) client := mock.HTTPClient() accessor := makeReport() @@ -98,6 +127,59 @@ func TestChecker_CheckRetryFailure(t *testing.T) { err := provider.Check(context.Background(), client, accessor) assert.NoError(t, err) + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 2) + for _, c := range s.Checks { + assert.True(t, c.Passing) + } + }) +} + +func TestChecker_CheckHandles500Error(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + }, + } + provider := kms.NewProvider(context.Background(), cfg) + + mock := test.NewHTTPMock() + mock.Expect(http.MethodGet, "", http.StatusInternalServerError, nil) + client := mock.HTTPClient() + + accessor := makeReport() + + err := provider.Check(context.Background(), client, accessor) + assert.NoError(t, err) + + accessor.ReadFromReport(func(s *status.ClusterStatus) { + assert.Len(t, s.Checks, 1) + for _, c := range s.Checks { + assert.False(t, c.Passing) + } + }) +} + +func TestChecker_CheckHandlesTimeout(t *testing.T) { + cfg := &config.Settings{ + Prometheus: config.Prometheus{ + KubeStateMetricsServiceEndpoint: mockURL, + }, + } + provider := kms.NewProvider(context.Background(), cfg) + + mock := test.NewHTTPMock() + mock.Expect(http.MethodGet, "", http.StatusOK, nil) + client := mock.HTTPClient() + + accessor := makeReport() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := provider.Check(ctx, client, accessor) + assert.NoError(t, err) + accessor.ReadFromReport(func(s *status.ClusterStatus) { assert.Len(t, s.Checks, 1) for _, c := range s.Checks {