Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/golang-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ jobs:
go mod download
- name: Run go tests
run: |
go test -timeout 30s -race -cover ./...
#go test -timeout 30s -race -cover ./...
go test -v ./pkg/diagnostic/kms/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Change back after testing

15 changes: 7 additions & 8 deletions pkg/diagnostic/cz/check.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: Copyright (c) 2016-2024, CloudZero, Inc. or its affiliates. All Rights Reserved.
// SPDX-LicenseCopyrightText: Copyright (c) 2016-2024, CloudZero, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package cz

Expand Down Expand Up @@ -32,22 +32,21 @@ func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider
}
}

func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error {

func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor, cfg *config.Settings) error {
// Hit an authenticated API to verify the API token
url := fmt.Sprintf("%s/v2/insights", c.cfg.Cloudzero.Host)
url := fmt.Sprintf("%s/v2/insights", cfg.Cloudzero.Host)
_, err := http.Do(
ctx, client, net.MethodGet,
map[string]string{
http.HeaderAuthorization: strings.TrimSpace(c.cfg.Cloudzero.Credential),
http.HeaderAuthorization: strings.TrimSpace(cfg.Cloudzero.Credential),
http.HeaderAcceptEncoding: http.ContentTypeJSON,
},
nil,
// TODO: Add HEAD endpoint for container-metrics/status and pass these to check the API key
// map[string]string{
// http.QueryParamAccountID: c.cfg.Deployment.AccountID,
// http.QueryParamRegion: c.cfg.Deployment.Region,
// http.QueryParamClusterName: c.cfg.Deployment.ClusterName,
// http.QueryParamAccountID: cfg.Deployment.AccountID,
// http.QueryParamRegion: cfg.Deployment.Region,
// http.QueryParamClusterName: cfg.Deployment.ClusterName,
// },
url, nil,
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/diagnostic/cz/check_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: Copyright (c) 2016-2024, CloudZero, Inc. or its affiliates. All Rights Reserved.
// SPDX-LicenseCopyrightText: Copyright (c) 2016-2024, CloudZero, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package cz_test

Expand Down Expand Up @@ -39,7 +39,7 @@ func TestChecker_CheckOK(t *testing.T) {

accessor := makeReport()

err := provider.Check(context.Background(), client, accessor)
err := provider.Check(context.Background(), client, accessor, cfg)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
Expand All @@ -64,7 +64,7 @@ func TestChecker_CheckBadKey(t *testing.T) {
client := mock.HTTPClient()

accessor := makeReport()
err := provider.Check(context.Background(), client, accessor)
err := provider.Check(context.Background(), client, accessor, cfg)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/diagnostic/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"

"github.com/cloudzero/cloudzero-agent-validator/pkg/config"
"github.com/cloudzero/cloudzero-agent-validator/pkg/status"
)

Expand All @@ -12,5 +13,5 @@ import (
type Provider interface {
// Check will perform a targeted check(s) setting meaningful values on the status object
// and only will return an error if the condition is unrecoverable
Check(_ context.Context, _ *http.Client, _ status.Accessor) error
Check(_ context.Context, _ *http.Client, _ status.Accessor, _ *config.Settings) error
}
9 changes: 4 additions & 5 deletions pkg/diagnostic/egress/check.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: Copyright (c) 2016-2024, CloudZero, Inc. or its affiliates. All Rights Reserved.
// SPDX-LicenseCopyrightText: Copyright (c) 2016-2024, CloudZero, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package egress

Expand Down Expand Up @@ -31,10 +31,9 @@ func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider
}
}

func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error {

// simple unuathenticated check for egress access
url := fmt.Sprintf("%s", c.cfg.Cloudzero.Host)
func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor, cfg *config.Settings) error {
// simple unauthenticated check for egress access
url := fmt.Sprintf("%s", cfg.Cloudzero.Host)
_, err := http.Do(ctx, client, net.MethodGet, nil, nil, url, nil)
if err == nil {
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticEgressAccess, Passing: true})
Expand Down
6 changes: 3 additions & 3 deletions pkg/diagnostic/egress/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestChecker_CheckOK(t *testing.T) {

accessor := makeReport()

err := provider.Check(context.Background(), client, accessor)
err := provider.Check(context.Background(), client, accessor, cfg)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
Expand All @@ -64,7 +64,7 @@ func TestChecker_CheckBadKey(t *testing.T) {
client := mock.HTTPClient()

accessor := makeReport()
err := provider.Check(context.Background(), client, accessor)
err := provider.Check(context.Background(), client, accessor, cfg)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
Expand All @@ -89,7 +89,7 @@ func TestChecker_CheckErrorCondition(t *testing.T) {
client := mock.HTTPClient()

accessor := makeReport()
err := provider.Check(context.Background(), client, accessor)
err := provider.Check(context.Background(), client, accessor, cfg)
assert.NoError(t, err)
accessor.ReadFromReport(func(s *status.ClusterStatus) {
assert.Len(t, s.Checks, 1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/diagnostic/k8s/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider
}
}

func (c *checker) Check(_ context.Context, client *http.Client, accessor status.Accessor) error {
func (c *checker) Check(_ context.Context, client *http.Client, accessor status.Accessor, cfg *config.Settings) error {
version, err := c.getK8sVersion(client)
if err != nil {
accessor.AddCheck(
Expand Down
10 changes: 4 additions & 6 deletions pkg/diagnostic/k8s/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func TestChecker_CheckOK(t *testing.T) {
cfg := &config.Settings{}

// IMPORTANT:
// 1. CI/CD will require a known K8s (kind) versionm
// 2. If you are running thislocal, I suggest deploying Rancher Desktop
// 1. CI/CD will require a known K8s (kind) version
// 2. If you are running this locally, I suggest deploying Rancher Desktop
//
// Ideally we improve our MockTransport to handle this
// Allowing us to overide the client config.Transport
// Allowing us to override the client config.Transport
//
// XXX: Replace with the expected version
expectedVersion := "1.29"
Expand All @@ -50,7 +50,7 @@ func TestChecker_CheckOK(t *testing.T) {
client := mock.HTTPClient()

accessor := makeReport()
err := provider.Check(context.Background(), client, accessor)
err := provider.Check(context.Background(), client, accessor, cfg)
assert.NoError(t, err)

accessor.ReadFromReport(func(s *status.ClusterStatus) {
Expand All @@ -61,5 +61,3 @@ func TestChecker_CheckOK(t *testing.T) {
assert.Equal(t, expectedVersion, s.K8SVersion)
})
}

// func TestK8s
129 changes: 111 additions & 18 deletions pkg/diagnostic/kms/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@ package kms
import (
"context"
"fmt"
"io/ioutil"
net "net/http"
"strings"
"time"

"github.com/cloudzero/cloudzero-agent-validator/pkg/config"
"github.com/cloudzero/cloudzero-agent-validator/pkg/diagnostic"
"github.com/cloudzero/cloudzero-agent-validator/pkg/http"
"github.com/cloudzero/cloudzero-agent-validator/pkg/logging"
"github.com/cloudzero/cloudzero-agent-validator/pkg/status"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const DiagnosticKMS = config.DiagnosticKMS
Expand All @@ -23,46 +28,134 @@ var (
)

type checker struct {
cfg *config.Settings
logger *logrus.Entry
cfg *config.Settings
logger *logrus.Entry
clientset kubernetes.Interface
}

func NewProvider(ctx context.Context, cfg *config.Settings) diagnostic.Provider {
func NewProvider(ctx context.Context, cfg *config.Settings, clientset ...kubernetes.Interface) diagnostic.Provider {
var cs kubernetes.Interface
if len(clientset) > 0 {
cs = clientset[0]
} else {
// Use the in-cluster config if running inside a cluster, otherwise use the default kubeconfig
config, err := rest.InClusterConfig()
if err != nil {
kubeconfig := clientcmd.RecommendedHomeFile
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
}

// Create the clientset
cs, err = kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
}

return &checker{
cfg: cfg,
logger: logging.NewLogger().
WithContext(ctx).WithField(logging.OpField, "ksm"),
clientset: cs,
}
}

func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor) error {
func (c *checker) Check(ctx context.Context, client *net.Client, accessor status.Accessor, cfg *config.Settings) error {
var (
err error
retriesRemaining = MaxRetry
url = fmt.Sprintf("%s/", c.cfg.Prometheus.KubeStateMetricsServiceEndpoint)
namespace = "prom-agent"
serviceName = "cz-prom-agent-kube-state-metrics"
Comment on lines +69 to +70
Copy link
Contributor Author

@bdrennz bdrennz Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Need namespace and serviceName to be passed as args.

endpointURL string
)

// We need to build in a retry here because the kube-state-metrics service can take a few seconds to start up
// If it is deploying with the cloudzero-agent chart
for {
_, err = http.Do(ctx, client, net.MethodGet, nil, nil, url, nil)
if err == nil {
break
// Wait for the pod to become ready and find the first available endpoint
for retriesRemaining > 0 {
endpoints, err := c.clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
if err != nil {
c.logger.Errorf("Failed to get service endpoints: %v", err)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to get service endpoints: %s", err.Error())})
return nil
}
if retriesRemaining == 0 {

// Log the endpoints for debugging
c.logger.Infof("Endpoints: %v", endpoints)

// Check if there are any ready addresses and find the first available endpoint
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
c.logger.Infof("Address: %v", address)
for _, port := range subset.Ports {
c.logger.Infof("Port: %v", port)
if port.Port == 8080 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Port should be an arg

endpointURL = fmt.Sprintf("http://%s:%d/metrics", address.IP, port.Port)
break
}
}
if endpointURL != "" {
break
}
}
if endpointURL != "" {
break
}
}

if endpointURL != "" {
break
}

c.logger.Infof("Pod is not ready, waiting...")
retriesRemaining--
time.Sleep(RetryInterval)
}

if err != nil {
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: err.Error()})
if retriesRemaining == 0 {
c.logger.Errorf("Pod did not become ready in time")
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: "Pod did not become ready in time"})
return nil
}

accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true})
return nil
c.logger.Infof("Using endpoint URL: %s", endpointURL)

// Retry logic to handle transient issues
retriesRemaining = MaxRetry
for retriesRemaining > 0 {
resp, err := client.Get(endpointURL)
if err == nil && resp.StatusCode == net.StatusOK {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
c.logger.Errorf("Failed to read metrics: %v", err)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to read metrics: %s", err.Error())})
return nil
}

metrics := string(body)
requiredMetrics := []string{"kube_pod_info", "kube_node_info"} // Add the required metrics here
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Pass all Chart metrics via validator config file

allMetricsFound := true
for _, metric := range requiredMetrics {
if !strings.Contains(metrics, metric) {
c.logger.Errorf("Required metric %s not found", metric)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Required metric %s not found", metric)})
allMetricsFound = false
}
}

if allMetricsFound {
c.logger.Infof("All required metrics found: %v", requiredMetrics)
accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: true})
}
return nil
}

c.logger.Errorf("Failed to fetch metrics: %v", err)
retriesRemaining--
time.Sleep(RetryInterval)
}

accessor.AddCheck(&status.StatusCheck{Name: DiagnosticKMS, Passing: false, Error: fmt.Sprintf("Failed to fetch metrics after %d retries", MaxRetry)})
return nil
}
Loading