Skip to content

Commit

Permalink
Modify KSM to check metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bdrennz committed Nov 25, 2024
1 parent 33d5ebb commit f84be9a
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 25 deletions.
118 changes: 102 additions & 16 deletions pkg/diagnostic/kms/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@ package kms
import (
"context"
"fmt"
"io/ioutil"

Check failure on line 6 in pkg/diagnostic/kms/check.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: "io/ioutil" has been deprecated since Go 1.19: As of Go 1.16, the same functionality is now provided by package [io] or package [os], and those implementations should be preferred in new code. See the specific function documentation for details. (staticcheck)
net "net/http"
"strings"
"time"

"github.com/cloudzero/cloudzero-agent-validator/pkg/config"
"github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic"
"github.com/cloudzero/cloudzero-agent-validator/pkg/http"
"github.com/cloudzero/cloudzero-agent-validator/pkg/logging"
"github.com/cloudzero/cloudzero-agent-validator/pkg/status"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const DiagnosticKMS = config.DiagnosticKMS
const DiagnosticKMSMetrics = "DiagnosticKMSMetrics"

var (
// Exported so that it can be overridden in tests
Expand All @@ -23,46 +29,126 @@ var (
)

type checker struct {
cfg *config.Settings
logger *logrus.Entry
cfg *config.Settings
logger *logrus.Entry
clientset *kubernetes.Clientset
}

func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider {
// Use the in-cluster config if running inside a cluster, otherwise use the default kubeconfig
config, err := rest.InClusterConfig()
if err != nil {
kubeconfig := clientcmd.RecommendedHomeFile
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
}

// Create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}

return &checker{
cfg: cfg,
logger: logging.NewLogger().
WithContext(ctx).WithField(logging.OpField, "ksm"),
clientset: clientset,
}
}

func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error {
var (
err error
retriesRemaining = MaxRetry
url = fmt.Sprintf("%s/", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint)
namespace = "prom-agent"
serviceName = "cz-prom-agent-kube-state-metrics"
endpointURL string
)

// We need to build in a retry here because the kube-state-metrics service can take a few seconds to start up
// If it is deploying with the cloudzero-agent chart
for {
_, err = http.Do(ctx, client, net.MethodGet, nil, nil, url, nil)
if err == nil {
break
// Wait for the pod to become ready and find the first available endpoint
for retriesRemaining > 0 {
endpoints, err := c.clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
if err != nil {
c.logger.Errorf("Failed to get service endpoints: %v", err)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to get service endpoints: %s", err.Error())})
return nil
}

// Log the endpoints for debugging
c.logger.Infof("Endpoints: %v", endpoints)

// Check if there are any ready addresses and find the first available endpoint
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
c.logger.Infof("Address: %v", address)
for _, port := range subset.Ports {
c.logger.Infof("Port: %v", port)
if port.Port == 8080 {
endpointURL = fmt.Sprintf("http://%s:%d/metrics", address.IP, port.Port)
break
}
}
if endpointURL != "" {
break
}
}
if endpointURL != "" {
break
}
}
if retriesRemaining == 0 {

if endpointURL != "" {
break
}

c.logger.Infof("Pod is not ready, waiting...")
retriesRemaining--
time.Sleep(RetryInterval)
}

if err != nil {
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: err.Error()})
if retriesRemaining == 0 {
c.logger.Errorf("Pod did not become ready in time")
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: "Pod did not become ready in time"})
return nil
}

accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true})
return nil
c.logger.Infof("Using endpoint URL: %s", endpointURL)

// Retry logic to handle transient issues
retriesRemaining = MaxRetry
for retriesRemaining > 0 {
resp, err := client.Get(endpointURL)
if err == nil && resp.StatusCode == net.StatusOK {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
c.logger.Errorf("Failed to read metrics: %v", err)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())})
return nil
}

metrics := string(body)
requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here
for _, metric := range requiredMetrics {
if !strings.Contains(metrics, metric) {
c.logger.Errorf("Required metric %s not found", metric)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)})
return nil
}
}

accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true})
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: true})
return nil
}

c.logger.Errorf("Failed to fetch metrics: %v", err)
retriesRemaining--
time.Sleep(RetryInterval)
}

accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMSMetrics, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)})
return nil
}
100 changes: 91 additions & 9 deletions pkg/diagnostic/kms/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestChecker_CheckOK(t *testing.T) {
provider := kms.NewProvider(context.Background(), cfg)

mock := test.NewHTTPMock()
mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil)
mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil)
client := mock.HTTPClient()

accessor := makeReport()
Expand All @@ -40,7 +40,7 @@ func TestChecker_CheckOK(t *testing.T) {
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
assert.Len(t, s.Checks, 1)
assert.Len(t, s.Checks, 2)
for _, c := range s.Checks {
assert.True(t, c.Passing)
}
Expand All @@ -55,13 +55,15 @@ func TestChecker_CheckRetry(t *testing.T) {
}
provider := kms.NewProvider(context.Background(), cfg)

// Update the test sleep interval to accellerate the test
// Update the test sleep interval to accelerate the test
kms.RetryInterval = 10 * time.Millisecond
kms.MaxRetry = 3

mock := test.NewHTTPMock()
for i := 0; i < kms.MaxRetry; i++ {
for i := 0; i < kms.MaxRetry-1; i++ {
mock.Expect(http.MethodGet, "", http.StatusNotFound, nil)
}
mock.Expect(http.MethodGet, "Hello World", http.StatusOK, nil)
mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil)
client := mock.HTTPClient()

accessor := makeReport()
Expand All @@ -70,7 +72,7 @@ func TestChecker_CheckRetry(t *testing.T) {
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
assert.Len(t, s.Checks, 1)
assert.Len(t, s.Checks, 2)
for _, c := range s.Checks {
assert.True(t, c.Passing)
}
Expand All @@ -85,19 +87,99 @@ func TestChecker_CheckRetryFailure(t *testing.T) {
}
provider := kms.NewProvider(context.Background(), cfg)

// Update the test sleep interval to accellerate the test
// Update the test sleep interval to accelerate the test
kms.RetryInterval = 10 * time.Millisecond
kms.MaxRetry = 0
kms.MaxRetry = 3

mock := test.NewHTTPMock()
for i := 0; i < kms.MaxRetry; i++ {
mock.Expect(http.MethodGet, "", http.StatusNotFound, nil)
}
client := mock.HTTPClient()

accessor := makeReport()

err := provider.Check(context.Background(), client, accessor)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
assert.Len(t, s.Checks, 1)
for _, c := range s.Checks {
assert.False(t, c.Passing)
}
})
}

func TestChecker_CheckMetricsValidation(t *testing.T) {
cfg := &config.Settings{
Prometheus: config.Prometheus{
KubeStateMetricsServiceEndpoint: mockURL,
},
}
provider := kms.NewProvider(context.Background(), cfg)

mock := test.NewHTTPMock()
mock.Expect(http.MethodGet, "", http.StatusNotFound, nil)
mock.Expect(http.MethodGet, "kube_pod_info\nkube_node_info\n", http.StatusOK, nil)
client := mock.HTTPClient()

accessor := makeReport()

err := provider.Check(context.Background(), client, accessor)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
assert.Len(t, s.Checks, 2)
for _, c := range s.Checks {
assert.True(t, c.Passing)
}
})
}

func TestChecker_CheckHandles500Error(t *testing.T) {
cfg := &config.Settings{
Prometheus: config.Prometheus{
KubeStateMetricsServiceEndpoint: mockURL,
},
}
provider := kms.NewProvider(context.Background(), cfg)

mock := test.NewHTTPMock()
mock.Expect(http.MethodGet, "", http.StatusInternalServerError, nil)
client := mock.HTTPClient()

accessor := makeReport()

err := provider.Check(context.Background(), client, accessor)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
assert.Len(t, s.Checks, 1)
for _, c := range s.Checks {
assert.False(t, c.Passing)
}
})
}

func TestChecker_CheckHandlesTimeout(t *testing.T) {
cfg := &config.Settings{
Prometheus: config.Prometheus{
KubeStateMetricsServiceEndpoint: mockURL,
},
}
provider := kms.NewProvider(context.Background(), cfg)

mock := test.NewHTTPMock()
mock.Expect(http.MethodGet, "", http.StatusOK, nil)
client := mock.HTTPClient()

accessor := makeReport()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := provider.Check(ctx, client, accessor)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
assert.Len(t, s.Checks, 1)
for _, c := range s.Checks {
Expand Down

0 comments on commit f84be9a

Please sign in to comment.