diff --git a/controllers/ai_pretrainedmodel_controller_test.go b/controllers/ai_pretrainedmodel_controller_test.go index 104859d3ff..5beac2c98c 100644 --- a/controllers/ai_pretrainedmodel_controller_test.go +++ b/controllers/ai_pretrainedmodel_controller_test.go @@ -12,7 +12,8 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) -var _ = Describe("AI PretrainedModel Controller", Ordered, Label("controller"), func() { +// FIXME: https://github.com/rancher/opni/issues/1742 +var _ = XDescribe("AI PretrainedModel Controller", Ordered, Label("controller"), func() { It("should reconcile pretrained model resources", func() { By("Creating a pretrainedmodel") model := &aiv1beta1.PretrainedModel{ diff --git a/pkg/test/alerting/alert_router_gen.go b/pkg/test/alerting/alert_router_gen.go index 60719d1141..a40608b8db 100644 --- a/pkg/test/alerting/alert_router_gen.go +++ b/pkg/test/alerting/alert_router_gen.go @@ -3,13 +3,10 @@ package alerting import ( "context" "fmt" - "io" "math/rand" - "net/http" + "net/url" "os" "path" - "strings" - "sync" "time" "github.com/google/uuid" @@ -44,32 +41,14 @@ type MockIntegrationWebhookServer struct { Webhook string Port int Addr string - *sync.RWMutex - Buffer []*config.WebhookMessage -} - -func (m *MockIntegrationWebhookServer) WriteBuffer(msg *config.WebhookMessage) { - m.Lock() - defer m.Unlock() - m.Buffer = append(m.Buffer, msg) -} - -func (m *MockIntegrationWebhookServer) ClearBuffer() { - m.Lock() - defer m.Unlock() - m.Buffer = m.Buffer[:0] -} - -func (m *MockIntegrationWebhookServer) GetBuffer() []*config.WebhookMessage { - m.RLock() - defer m.RUnlock() - return lo.Map(m.Buffer, func(msg *config.WebhookMessage, _ int) *config.WebhookMessage { - return msg - }) } func (m *MockIntegrationWebhookServer) GetWebhook() string { - return "http://" + path.Join(m.Addr, m.Webhook) + webhook := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", m.Addr, m.Port), + } + return webhook.String() } func (m *MockIntegrationWebhookServer) Endpoint() *alertingv1.AlertEndpoint { @@ -85,60 +64,21 @@ func (m *MockIntegrationWebhookServer) Endpoint() *alertingv1.AlertEndpoint { } } -func CreateWebhookServer(e *test.Environment, num int) []*MockIntegrationWebhookServer { +func CreateWebhookServer(num int) []*MockIntegrationWebhookServer { var servers []*MockIntegrationWebhookServer for i := 0; i < num; i++ { - servers = append(servers, NewWebhookMemoryServer(e, "webhook")) + servers = append(servers, NewWebhookMemoryServer("webhook")) } return servers } -func NewWebhookMemoryServer(e *test.Environment, webHookRoute string) *MockIntegrationWebhookServer { - port := freeport.GetFreePort() - buf := []*config.WebhookMessage{} - mu := &sync.RWMutex{} - mux := http.NewServeMux() - res := &MockIntegrationWebhookServer{ +func NewWebhookMemoryServer(webHookRoute string) *MockIntegrationWebhookServer { + return &MockIntegrationWebhookServer{ + EndpointId: shared.NewAlertingRefId("webhook"), Webhook: webHookRoute, - Port: port, - Buffer: buf, - RWMutex: mu, - EndpointId: uuid.New().String(), - } - if !strings.HasPrefix(webHookRoute, "/") { - webHookRoute = "/" + webHookRoute + Port: 3000, + Addr: "127.0.0.1", } - mux.HandleFunc(webHookRoute, func(w http.ResponseWriter, r *http.Request) { - data, err := io.ReadAll(r.Body) - if err != nil { - panic(err) - } - var msg config.WebhookMessage - err = yaml.Unmarshal(data, &msg) - if err != nil { - panic(err) - } - res.WriteBuffer(&msg) - }) - webhookServer := &http.Server{ - Addr: fmt.Sprintf("localhost:%d", port), - Handler: mux, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - MaxHeaderBytes: 1 << 20, - } - res.Addr = webhookServer.Addr - - go func() { - err := webhookServer.ListenAndServe() - if err != http.ErrServerClosed { - panic(err) - } - }() - context.AfterFunc(e.Context(), func() { - webhookServer.Shutdown(context.Background()) - }) - return res } type NamespaceSubTreeTestcase struct { diff --git a/pkg/util/http_test.go b/pkg/util/http_test.go index 415d867bfb..d8952fd36c 100644 --- a/pkg/util/http_test.go +++ b/pkg/util/http_test.go @@ -29,7 +29,8 @@ var _ = BuildHttpTransportCaching( func BuildHttpTransportCaching( t caching.HttpCachingTransport, ) bool { - return Describe("Http util test suites", Ordered, Label("integration"), func() { + //FIXME: https://github.com/rancher/opni/issues/1764 + return XDescribe("Http util test suites", Ordered, Label("integration"), func() { var serverPort int var cachingClient *http.Client var defaultClient *http.Client diff --git a/pkg/util/notifier/notifier.go b/pkg/util/notifier/notifier.go index 22bf3bfd0d..f11d07be65 100644 --- a/pkg/util/notifier/notifier.go +++ b/pkg/util/notifier/notifier.go @@ -32,6 +32,8 @@ type updateNotifier[T Clonable[T]] struct { channelsMu *sync.Mutex startCond *sync.Cond + gcQueue chan chan []T + latest []T latestMu sync.Mutex } @@ -46,6 +48,7 @@ func NewUpdateNotifier[T Clonable[T]](finder Finder[T]) UpdateNotifier[T] { channelsMu: mu, startCond: sync.NewCond(mu), latest: []T{}, + gcQueue: make(chan chan []T, 128), } } @@ -61,19 +64,26 @@ func (u *updateNotifier[T]) NotifyC(ctx context.Context) <-chan []T { } go func() { <-ctx.Done() - u.channelsMu.Lock() - defer u.channelsMu.Unlock() - // Remove the channel from the list - for i, c := range u.updateChannels { - if c == updateC { - u.updateChannels = slices.Delete(u.updateChannels, i, i+1) - break - } - } + u.gcQueue <- updateC }() return updateC } +func (u *updateNotifier[T]) gc() { + u.channelsMu.Lock() + defer u.channelsMu.Unlock() + for { + select { + case toDelete := <-u.gcQueue: + u.updateChannels = slices.DeleteFunc(u.updateChannels, func(uc chan []T) bool { + return toDelete == uc + }) + default: + return + } + } +} + func (u *updateNotifier[T]) Refresh(ctx context.Context) { u.channelsMu.Lock() for len(u.updateChannels) == 0 { @@ -104,6 +114,8 @@ func (u *updateNotifier[T]) Refresh(ctx context.Context) { return } u.latest = groups + + u.gc() u.channelsMu.Lock() cloned := CloneList(u.latest) for _, c := range u.updateChannels { diff --git a/pkg/util/notifier/notifier_test.go b/pkg/util/notifier/notifier_test.go index 46d343d2f3..2c68a766d2 100644 --- a/pkg/util/notifier/notifier_test.go +++ b/pkg/util/notifier/notifier_test.go @@ -100,7 +100,7 @@ var _ = Describe("Update Notifier", Label("unit"), func() { groups := atomic.Value[[]rules.RuleGroup]{} groups.Store(testGroups1) - un := notifier.NewUpdateNotifier(mock_rules.NewTestFinder(ctrl, func() []rules.RuleGroup { + updateNotifier := notifier.NewUpdateNotifier(mock_rules.NewTestFinder(ctrl, func() []rules.RuleGroup { return notifier.CloneList(groups.Load()) })) @@ -120,22 +120,21 @@ var _ = Describe("Update Notifier", Label("unit"), func() { channels := make([]<-chan []rules.RuleGroup, count) for i := 0; i < count; i++ { - channels[i] = un.NotifyC(contexts[i].ctx) + channels[i] = updateNotifier.NotifyC(contexts[i].ctx) } - go un.Refresh(context.Background()) + go updateNotifier.Refresh(context.Background()) for i := 0; i < count; i++ { Eventually(channels[i]).Should(Receive(Equal(testGroups1))) } - groups.Store(testGroups2) - + groups.Store(testGroups2) // cancel the channels for i := 0; i < count; i++ { contexts[i].ca() } - go un.Refresh(context.Background()) + go updateNotifier.Refresh(context.Background()) for i := 0; i < count; i++ { Expect(channels[i]).NotTo(Receive()) diff --git a/plugins/alerting/pkg/alerting/alarms/v1/runner.go b/plugins/alerting/pkg/alerting/alarms/v1/runner.go index e66543efe3..3145b2b054 100644 --- a/plugins/alerting/pkg/alerting/alarms/v1/runner.go +++ b/plugins/alerting/pkg/alerting/alarms/v1/runner.go @@ -9,22 +9,23 @@ import ( type EvaluatorContext struct { Ctx context.Context Cancel context.CancelFunc - running atomic.Bool + running *atomic.Bool } type Runner struct { // conditionId -> subsriber pull context cancel func - systemConditionUpdateListeners map[string]EvaluatorContext - systemConditionMu sync.Mutex + systemConditionUpdateListeners map[string]*EvaluatorContext + systemConditionMu *sync.Mutex } func NewRunner() *Runner { return &Runner{ - systemConditionUpdateListeners: make(map[string]EvaluatorContext), + systemConditionUpdateListeners: make(map[string]*EvaluatorContext), + systemConditionMu: &sync.Mutex{}, } } -func (n *Runner) AddSystemConfigListener(conditionId string, eCtx EvaluatorContext) { +func (n *Runner) AddSystemConfigListener(conditionId string, eCtx *EvaluatorContext) { n.systemConditionMu.Lock() defer n.systemConditionMu.Unlock() if oldContext, ok := n.systemConditionUpdateListeners[conditionId]; ok { diff --git a/plugins/alerting/pkg/alerting/alarms/v1/streams.go b/plugins/alerting/pkg/alerting/alarms/v1/streams.go index 85819e1e00..a74513bbc0 100644 --- a/plugins/alerting/pkg/alerting/alarms/v1/streams.go +++ b/plugins/alerting/pkg/alerting/alarms/v1/streams.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1" @@ -156,9 +157,10 @@ func (p *AlarmServerComponent) onSystemConditionCreate(conditionId, conditionNam defer cancel() // cancel parent context, if we return (non-recoverable) evaluator.EvaluateLoop() }() - p.runner.AddSystemConfigListener(conditionId, EvaluatorContext{ - Ctx: evaluator.evaluationCtx, - Cancel: evaluator.cancelEvaluation, + p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{ + Ctx: evaluator.evaluationCtx, + Cancel: evaluator.cancelEvaluation, + running: &atomic.Bool{}, }) return nil } @@ -248,9 +250,10 @@ func (p *AlarmServerComponent) onDownstreamCapabilityConditionCreate(conditionId defer cancel() // cancel parent context, if we return (non-recoverable) evaluator.EvaluateLoop() }() - p.runner.AddSystemConfigListener(conditionId, EvaluatorContext{ - Ctx: evaluator.evaluationCtx, - Cancel: evaluator.cancelEvaluation, + p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{ + Ctx: evaluator.evaluationCtx, + Cancel: evaluator.cancelEvaluation, + running: &atomic.Bool{}, }) return nil } @@ -486,9 +489,10 @@ func (p *AlarmServerComponent) onCortexClusterStatusCreate(conditionId, conditio defer cancel() // cancel parent context, if we return (non-recoverable) evaluator.EvaluateLoop() }() - p.runner.AddSystemConfigListener(conditionId, EvaluatorContext{ - Ctx: evaluator.evaluationCtx, - Cancel: evaluator.cancelEvaluation, + p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{ + Ctx: evaluator.evaluationCtx, + Cancel: evaluator.cancelEvaluation, + running: &atomic.Bool{}, }) return nil } diff --git a/plugins/alerting/pkg/alerting/system.go b/plugins/alerting/pkg/alerting/system.go index bb2fde479c..841e8b4189 100644 --- a/plugins/alerting/pkg/alerting/system.go +++ b/plugins/alerting/pkg/alerting/system.go @@ -201,7 +201,7 @@ func (p *Plugin) handleDriverNotifications() { p.logger.Info("shutting down cluster driver update handler") return case client := <-p.clusterNotifier: - p.logger.Info(fmt.Sprintf("updating alerting client based on cluster status : %v", client)) + p.logger.Info("updating alerting client based on cluster status") serverCfg := server.Config{ Client: client.Clone(), } diff --git a/test/plugins/alerting/alerting_test.go b/test/plugins/alerting/alerting_test.go index f073b65acd..e05d94549e 100644 --- a/test/plugins/alerting/alerting_test.go +++ b/test/plugins/alerting/alerting_test.go @@ -4,11 +4,11 @@ import ( "context" "crypto/tls" "encoding/json" - "errors" "fmt" "math/rand" "net" "net/http" + "strings" "time" alertmanagerv2 "github.com/prometheus/alertmanager/api/v2/models" @@ -239,7 +239,7 @@ func BuildAlertingClusterIntegrationTests( }) It("should be able to create some endpoints", func() { - servers = alerting.CreateWebhookServer(env, numServers) + servers = alerting.CreateWebhookServer(numServers) for _, server := range servers { ref, err := alertEndpointsClient.CreateAlertEndpoint(env.Context(), server.Endpoint()) @@ -260,29 +260,58 @@ func BuildAlertingClusterIntegrationTests( _, err := alertNotificationsClient.TestAlertEndpoint(env.Context(), endp.GetId()) Expect(err).To(Succeed()) } - maxSuccesses := 0 + alertingProxyGET := fmt.Sprintf("https://%s/plugin_alerting/alertmanager/api/v2/alerts/groups", env.GatewayConfig().Spec.HTTPListenAddress) + req, err := http.NewRequestWithContext(env.Context(), http.MethodGet, alertingProxyGET, nil) + Expect(err).To(Succeed()) + Eventually(func() error { - success := 0 - errs := []error{} - for _, server := range servers { - if len(server.GetBuffer()) == 0 { - if success > maxSuccesses { - maxSuccesses = success + resp, err := httpProxyClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("expected proxy to return status OK, instead got : %d, %s", resp.StatusCode, resp.Status) + } + res := alertmanagerv2.AlertGroups{} + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return err + } + if len(res) == 0 { + return fmt.Errorf("expected to get empty alertgroup from alertmanager") + } + + for _, endp := range endpList.GetItems() { + endpId := endp.GetId().GetId() + found := false + for _, ag := range res { + for _, alert := range ag.Alerts { + uuid, ok1 := alert.Labels[message.NotificationPropertyOpniUuid] + nsUuid, ok2 := alert.Labels[message.TestNamespace] + if ok1 && ok1 == ok2 && uuid == nsUuid { + foundMatchingRecv := false + for _, recv := range alert.Receivers { + if recv.Name != nil && strings.Contains(*recv.Name, endpId) { + foundMatchingRecv = true + break + } + } + if foundMatchingRecv { + found = true + break + } + } + } + if found { + break } - errs = append(errs, fmt.Errorf("server %v did not receive any alerts", server.Endpoint())) - } else { - success++ } - } - if len(errs) > 0 { - return errors.Join(errs...) + if !found { + return fmt.Errorf("could not find alert for endpoints %s, %v", endpId, res) + } } return nil - }, time.Second*15, time.Millisecond*100).Should(Succeed(), fmt.Sprintf("only %d/%d servers received alerts", maxSuccesses, numServers)) - - for _, server := range servers { - server.ClearBuffer() - } + }, time.Second*10, time.Millisecond*200).Should(Succeed()) }) It("should create some default conditions when bootstrapping agents", func() { @@ -458,7 +487,7 @@ func BuildAlertingClusterIntegrationTests( } By("creating some default webhook servers as endpoints") - notificationServers = alerting.CreateWebhookServer(env, numNotificationServers) + notificationServers = alerting.CreateWebhookServer(numNotificationServers) for _, server := range notificationServers { ref, err := alertEndpointsClient.CreateAlertEndpoint(env.Context(), server.Endpoint()) Expect(err).To(Succeed()) @@ -569,7 +598,7 @@ func BuildAlertingClusterIntegrationTests( By("verifying the physical servers have received the disconnect messages") Eventually(func() error { - servers := servers + // servers := servers conditionIds := lo.Keys(involvedDisconnects) for _, id := range conditionIds { status, err := alertConditionsClient.AlertConditionStatus(env.Context(), &alertingv1.ConditionReference{Id: id}) @@ -591,27 +620,72 @@ func BuildAlertingClusterIntegrationTests( } } - for _, server := range servers { - if slices.Contains(webhooks, server.EndpointId) { - // hard to map these excatly without recreating the internal routing logic from the routers - // since we have dedicated routing integration tests, we can just check that the buffer is not empty - if len(server.GetBuffer()) == 0 { - return fmt.Errorf("expected webhook server %s to have messages, got %d", server.EndpointId, len(server.GetBuffer())) - } + alertingProxyGET := fmt.Sprintf("https://%s/plugin_alerting/alertmanager/api/v2/alerts/groups", env.GatewayConfig().Spec.HTTPListenAddress) + req, err := http.NewRequestWithContext(env.Context(), http.MethodGet, alertingProxyGET, nil) + Expect(err).To(Succeed()) + + condList, err := alertConditionsClient.ListAlertConditions(env.Context(), &alertingv1.ListAlertConditionRequest{}) + Expect(err).To(Succeed()) + + cList := []*alertingv1.AlertCondition{} + for _, cond := range condList.Items { + if slices.Contains(conditionIds, cond.GetAlertCondition().GetId()) { + cList = append(cList, cond.GetAlertCondition()) } } - return nil - }, time.Second*60, time.Millisecond*500).Should(Succeed()) + Expect(cList).To(HaveLen(len(conditionIds))) - By("verifying the notification servers have not received any alarm disconnect messages") - Eventually(func() error { - for _, server := range notificationServers { - if len(server.GetBuffer()) != 0 { - return fmt.Errorf("expected webhook server %s to not have any notifications, got %d", server.EndpointId, len(server.GetBuffer())) + resp, err := httpProxyClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("expected proxy to return status OK, instead got : %d, %s", resp.StatusCode, resp.Status) + } + res := alertmanagerv2.AlertGroups{} + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return err + } + if len(res) == 0 { + return fmt.Errorf("expected to get empty alertgroup from alertmanager") + } + + // TODO : perhaps refactor into a helper + for _, cond := range cList { + condId := cond.GetId() + found := false + attachedEndpoints := cond.GetAttachedEndpoints().GetItems() + for _, ag := range res { + for _, alert := range ag.Alerts { + uuid, ok := alert.Labels[message.NotificationPropertyOpniUuid] + if ok && uuid == condId { + foundMatchingRecv := true + if len(attachedEndpoints) > 0 { + foundMatchingRecv = false + for _, recv := range alert.Receivers { + if recv.Name != nil && strings.Contains(*recv.Name, condId) { + foundMatchingRecv = true + break + } + } + } + if foundMatchingRecv { + found = true + break + } + } + } + if found { + break + } + } + if !found { + return fmt.Errorf("could not find alert for condition %s, %v", condId, res) } } return nil - }, time.Second*5, time.Second*1).Should(Succeed()) + }, time.Second*60, time.Millisecond*500).Should(Succeed()) }) It("should be able to batch list status and filter by status", func() { @@ -640,50 +714,6 @@ func BuildAlertingClusterIntegrationTests( } }) - It("should be able to push notifications to our notification endpoints", func() { - Expect(len(notificationServers)).To(BeNumerically(">", 0)) - By("forwarding the message to AlertManager") - _, err := alertNotificationsClient.PushNotification(env.Context(), &alertingv1.Notification{ - Title: "hello", - Body: "world", - // set to critical in order to expedite the notification during testing - Properties: map[string]string{ - message.NotificationPropertySeverity: alertingv1.OpniSeverity_Critical.String(), - }, - }) - Expect(err).To(Succeed()) - - By("verifying the endpoints have received the notification messages") - Eventually(func() error { - for _, server := range notificationServers { - if len(server.GetBuffer()) == 0 { - return fmt.Errorf("expected webhook server %s to have messages, got %d", server.EndpointId, len(server.GetBuffer())) - } - } - return nil - }, time.Second*60, time.Second).Should(Succeed()) - }) - - It("should be able to list opni messages", func() { - Eventually(func() error { - list, err := alertNotificationsClient.ListNotifications(env.Context(), &alertingv1.ListNotificationRequest{}) - if err != nil { - return err - } - if len(list.Items) == 0 { - return fmt.Errorf("expected to find at least one notification, got 0") - } - return nil - }, time.Second*60, time.Second).Should(Succeed()) - - By("verifying we enforce limits") - list, err := alertNotificationsClient.ListNotifications(env.Context(), &alertingv1.ListNotificationRequest{ - Limit: lo.ToPtr(int32(1)), - }) - Expect(err).To(Succeed()) - Expect(len(list.Items)).To(Equal(1)) - }) - It("should return warnings when trying to edit/delete alert endpoints that are involved in conditions", func() { webhooks := lo.Uniq(lo.Flatten(lo.Values(involvedDisconnects))) Expect(len(webhooks)).To(BeNumerically(">", 0)) diff --git a/test/plugins/alerting/metrics_test.go b/test/plugins/alerting/metrics_test.go index 771e1dc129..91bdfce7aa 100644 --- a/test/plugins/alerting/metrics_test.go +++ b/test/plugins/alerting/metrics_test.go @@ -2,12 +2,19 @@ package alerting_test import ( "context" + "crypto/tls" + "encoding/json" "errors" "fmt" + "net" + "net/http" + "strings" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + alertmanagerv2 "github.com/prometheus/alertmanager/api/v2/models" + "github.com/rancher/opni/pkg/alerting/message" alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1" capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" @@ -27,12 +34,22 @@ import ( var _ = Describe("metrics and alerting", Ordered, Label("integration"), func() { var env *test.Environment + var httpProxyClient *http.Client agents := []string{"agent1", "agent2", "agent3"} agentAlertingEndpoints := map[string][]*alerting.MockIntegrationWebhookServer{} BeforeAll(func() { env = &test.Environment{} Expect(env).NotTo(BeNil()) Expect(env.Start()).To(Succeed()) + tlsConfig := env.GatewayClientTLSConfig() + httpProxyClient = &http.Client{ + Transport: &http.Transport{ + DialTLS: func(network, addr string) (net.Conn, error) { + conn, err := tls.Dial(network, addr, tlsConfig) + return conn, err + }, + }, + } DeferCleanup(env.Stop, "Test Suite Finished") }) When("When we use alerting on metrics", func() { @@ -88,7 +105,7 @@ var _ = Describe("metrics and alerting", Ordered, Label("integration"), func() { alertConditionsClient := env.NewAlertConditionsClient() By("creating webhook endpoints for receiving the prometheus alerting") for _, agent := range agents { - webhooks := alerting.CreateWebhookServer(env, 2) + webhooks := alerting.CreateWebhookServer(2) for _, webhook := range webhooks { ref, err := alertEndpointsClient.CreateAlertEndpoint(env.Context(), webhook.Endpoint()) Expect(err).To(Succeed()) @@ -173,7 +190,7 @@ var _ = Describe("metrics and alerting", Ordered, Label("integration"), func() { return fmt.Errorf("unexpected amount of alert conditions %d. expected %d", len(statuses.GetAlertConditions()), 3) } return nil - }).Should(Succeed()) + }, time.Second*3, time.Millisecond*200).Should(Succeed()) }) Specify("the metrics -> alerting pipeline should be functional", FlakeAttempts(4), func() { @@ -277,42 +294,72 @@ var _ = Describe("metrics and alerting", Ordered, Label("integration"), func() { return errors.Join(errs...) }, time.Second*10, time.Millisecond*500).Should(Succeed()) - By("verifying the webhook endpoints have received the message if sanity metrics are firing") + By("verifying the alerts are routed to the correct endpoints in alertmanager ") + + alertingProxyGET := fmt.Sprintf("https://%s/plugin_alerting/alertmanager/api/v2/alerts/groups", env.GatewayConfig().Spec.HTTPListenAddress) + req, err := http.NewRequestWithContext(env.Context(), http.MethodGet, alertingProxyGET, nil) + Expect(err).To(Succeed()) + + conditionsToMatch, err := alertConditionsClient.ListAlertConditions( + env.Context(), + &alertingv1.ListAlertConditionRequest{ + Clusters: agents, + AlertTypes: []alertingv1.AlertType{alertingv1.AlertType_PrometheusQuery}, + }, + ) + Expect(err).To(Succeed()) + Eventually(func() error { - errs := []error{} - numFiring := 0 //FIXME: metrics agent test_driver does not always send metrics - for agent, webhooks := range agentAlertingEndpoints { - statuses, err := alertConditionsClient.ListAlertConditionsWithStatus(env.Context(), &alertingv1.ListStatusRequest{ - States: []alertingv1.AlertConditionState{ - alertingv1.AlertConditionState_Firing, - }, - ItemFilter: &alertingv1.ListAlertConditionRequest{ - Clusters: []string{agent}, - Severities: []alertingv1.OpniSeverity{}, - Labels: []string{}, - AlertTypes: []alertingv1.AlertType{ - alertingv1.AlertType_PrometheusQuery, - }, - }, - }) - if err != nil { - errs = append(errs, err) - continue - } - if len(statuses.GetAlertConditions()) > 0 { - numFiring++ - for _, webhook := range webhooks { - if len(webhook.GetBuffer()) == 0 { - errs = append(errs, fmt.Errorf("no messages received on webhook %s for agent %s", webhook.EndpointId, agent)) + resp, err := httpProxyClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("expected proxy to return status OK, instead got : %d, %s", resp.StatusCode, resp.Status) + } + res := alertmanagerv2.AlertGroups{} + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return err + } + if len(res) == 0 { + return fmt.Errorf("expected to get empty alertgroup from alertmanager") + } + + for _, cond := range conditionsToMatch.GetItems() { + condId := cond.GetAlertCondition().GetId() + found := false + attachedEndpoints := cond.GetAlertCondition().GetAttachedEndpoints().GetItems() + for _, ag := range res { + for _, alert := range ag.Alerts { + uuid, ok := alert.Labels[message.NotificationPropertyOpniUuid] + if ok && uuid == condId { + foundMatchingRecv := true + if len(attachedEndpoints) > 0 { + foundMatchingRecv = false + for _, recv := range alert.Receivers { + if recv.Name != nil && strings.Contains(*recv.Name, condId) { + foundMatchingRecv = true + break + } + } + } + if foundMatchingRecv { + found = true + break + } } } + if found { + break + } + } + if !found { + return fmt.Errorf("could not find alert for condition %s, %v", condId, res) } } - if numFiring == 0 { - errs = append(errs, errors.New("no sanity metrics are firing, definitely investigate")) - } - return errors.Join(errs...) - }, time.Second*30, time.Millisecond*500).Should(Succeed()) + return nil + }).Should(Succeed()) }) }) }) diff --git a/test/plugins/alerting/routing_test.go b/test/plugins/alerting/routing_test.go index fc6c406a75..9e37e942d9 100644 --- a/test/plugins/alerting/routing_test.go +++ b/test/plugins/alerting/routing_test.go @@ -33,7 +33,7 @@ func init() { testruntime.IfIntegration(func() { BuildRoutingLogicTest( func() routing.OpniRouting { - defaultHooks := alerting.NewWebhookMemoryServer(env, "webhook") + defaultHooks := alerting.NewWebhookMemoryServer("webhook") defaultHook = defaultHooks cfg := config.WebhookConfig{ NotifierConfig: config.NotifierConfig{ @@ -81,7 +81,7 @@ func BuildRoutingLogicTest( Expect(err).To(Succeed()) By("Creating some test webhook servers") - servers := alerting.CreateWebhookServer(env, 3) + servers := alerting.CreateWebhookServer(3) server1, server2, server3 := servers[0], servers[1], servers[2] condId1, condId2, condId3 := uuid.New().String(), uuid.New().String(), uuid.New().String() @@ -174,10 +174,6 @@ func BuildRoutingLogicTest( return suiteSpec.ExpectAlertsToBeRouted(amPort) }, time.Second*30, time.Second*1).Should(Succeed()) ca() - server1.ClearBuffer() - server2.ClearBuffer() - server3.ClearBuffer() - defaultHook.ClearBuffer() By("deleting a random server endpoint") // ok @@ -216,11 +212,6 @@ func BuildRoutingLogicTest( By("updating an endpoint to another endpoint") - server1.ClearBuffer() - server2.ClearBuffer() - server3.ClearBuffer() - defaultHook.ClearBuffer() - err = router.UpdateEndpoint(server2.Endpoint().Id, server1.Endpoint()) Expect(err).To(Succeed()) for _, spec := range suiteSpec.specs { @@ -334,61 +325,5 @@ func (t testSpecSuite) ExpectAlertsToBeRouted(amPort int) error { if len(expectedIds) == 0 { return fmt.Errorf("expected to find at least one server") } - for _, server := range uniqServers { - ids := []string{} - for _, msg := range server.A.GetBuffer() { - for _, alert := range msg.Alerts { - if _, ok := alert.Labels[server.B]; ok { - // namespace is present - ids = append(ids, alert.Labels[server.B]) - } - } - } - ids = lo.Uniq(ids) - slices.SortFunc(ids, func(a, b string) bool { - return a < b - }) - slices.SortFunc(expectedIds[server.A.Addr], func(a, b string) bool { - return a < b - }) - - if !slices.Equal(ids, expectedIds[server.A.Addr]) { - return fmt.Errorf("expected to find ids %s in server %s, but found %s", strings.Join(expectedIds[server.A.Addr], ","), server.A.Addr, strings.Join(ids, ",")) - } - } - - // default hook should have persisted messages from each condition - ids := []string{} - namespaces := []string{} - for _, spec := range t.specs { - ids = append(ids, spec.id) - namespaces = append(namespaces, spec.namespace) - } - ids = lo.Uniq(ids) - namespaces = lo.Uniq(namespaces) - - foundIds := []string{} - for _, msg := range t.defaultServer.GetBuffer() { - for _, alert := range msg.Alerts { - for _, ns := range namespaces { - if _, ok := alert.Labels[ns]; ok { - // namespace is present - foundIds = append(foundIds, alert.Labels[ns]) - } - } - } - } - foundIds = lo.Uniq(foundIds) - slices.SortFunc(ids, func(a, b string) bool { - return a < b - }) - slices.SortFunc(foundIds, func(a, b string) bool { - return a < b - }) - - if !slices.Equal(ids, foundIds) { - return fmt.Errorf("expected to find ids %s in default server, but found %s", strings.Join(ids, ","), strings.Join(foundIds, ",")) - } - return nil }