Skip to content

Commit

Permalink
remove testing physical dispatching in alerting tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandreLamarre committed Oct 24, 2023
1 parent 44654a8 commit 34d6639
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 261 deletions.
86 changes: 13 additions & 73 deletions pkg/test/alerting/alert_router_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package alerting
import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -44,32 +41,14 @@ type MockIntegrationWebhookServer struct {
Webhook string
Port int
Addr string
*sync.RWMutex
Buffer []*config.WebhookMessage
}

func (m *MockIntegrationWebhookServer) WriteBuffer(msg *config.WebhookMessage) {
m.Lock()
defer m.Unlock()
m.Buffer = append(m.Buffer, msg)
}

func (m *MockIntegrationWebhookServer) ClearBuffer() {
m.Lock()
defer m.Unlock()
m.Buffer = m.Buffer[:0]
}

func (m *MockIntegrationWebhookServer) GetBuffer() []*config.WebhookMessage {
m.RLock()
defer m.RUnlock()
return lo.Map(m.Buffer, func(msg *config.WebhookMessage, _ int) *config.WebhookMessage {
return msg
})
}

func (m *MockIntegrationWebhookServer) GetWebhook() string {
return "http://" + path.Join(m.Addr, m.Webhook)
webhook := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", m.Addr, m.Port),
}
return webhook.String()
}

func (m *MockIntegrationWebhookServer) Endpoint() *alertingv1.AlertEndpoint {
Expand All @@ -85,60 +64,21 @@ func (m *MockIntegrationWebhookServer) Endpoint() *alertingv1.AlertEndpoint {
}
}

func CreateWebhookServer(e *test.Environment, num int) []*MockIntegrationWebhookServer {
func CreateWebhookServer(num int) []*MockIntegrationWebhookServer {
var servers []*MockIntegrationWebhookServer
for i := 0; i < num; i++ {
servers = append(servers, NewWebhookMemoryServer(e, "webhook"))
servers = append(servers, NewWebhookMemoryServer("webhook"))
}
return servers
}

func NewWebhookMemoryServer(e *test.Environment, webHookRoute string) *MockIntegrationWebhookServer {
port := freeport.GetFreePort()
buf := []*config.WebhookMessage{}
mu := &sync.RWMutex{}
mux := http.NewServeMux()
res := &MockIntegrationWebhookServer{
func NewWebhookMemoryServer(webHookRoute string) *MockIntegrationWebhookServer {
return &MockIntegrationWebhookServer{
EndpointId: shared.NewAlertingRefId("webhook"),
Webhook: webHookRoute,
Port: port,
Buffer: buf,
RWMutex: mu,
EndpointId: uuid.New().String(),
}
if !strings.HasPrefix(webHookRoute, "/") {
webHookRoute = "/" + webHookRoute
Port: 3000,
Addr: "127.0.0.1",
}
mux.HandleFunc(webHookRoute, func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}
var msg config.WebhookMessage
err = yaml.Unmarshal(data, &msg)
if err != nil {
panic(err)
}
res.WriteBuffer(&msg)
})
webhookServer := &http.Server{
Addr: fmt.Sprintf("localhost:%d", port),
Handler: mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
}
res.Addr = webhookServer.Addr

go func() {
err := webhookServer.ListenAndServe()
if err != http.ErrServerClosed {
panic(err)
}
}()
context.AfterFunc(e.Context(), func() {
webhookServer.Shutdown(context.Background())
})
return res
}

type NamespaceSubTreeTestcase struct {
Expand Down
1 change: 1 addition & 0 deletions plugins/alerting/pkg/alerting/alarms/v1/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Runner struct {
func NewRunner() *Runner {
return &Runner{
systemConditionUpdateListeners: make(map[string]*EvaluatorContext),
systemConditionMu: &sync.Mutex{},
}
}

Expand Down
16 changes: 10 additions & 6 deletions plugins/alerting/pkg/alerting/alarms/v1/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1"
Expand Down Expand Up @@ -157,8 +158,9 @@ func (p *AlarmServerComponent) onSystemConditionCreate(conditionId, conditionNam
evaluator.EvaluateLoop()
}()
p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
running: &atomic.Bool{},
})
return nil
}
Expand Down Expand Up @@ -249,8 +251,9 @@ func (p *AlarmServerComponent) onDownstreamCapabilityConditionCreate(conditionId
evaluator.EvaluateLoop()
}()
p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
running: &atomic.Bool{},
})
return nil
}
Expand Down Expand Up @@ -487,8 +490,9 @@ func (p *AlarmServerComponent) onCortexClusterStatusCreate(conditionId, conditio
evaluator.EvaluateLoop()
}()
p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
running: &atomic.Bool{},
})
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/alerting/pkg/alerting/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *Plugin) handleDriverNotifications() {
p.logger.Info("shutting down cluster driver update handler")
return
case client := <-p.clusterNotifier:
p.logger.Info(fmt.Sprintf("updating alerting client based on cluster status : %v", client))
p.logger.Info("updating alerting client based on cluster status")
serverCfg := server.Config{
Client: client.Clone(),
}
Expand Down
Loading

0 comments on commit 34d6639

Please sign in to comment.