Skip to content

Commit

Permalink
Merge pull request #1761 from rancher/flake-fixes
Browse files Browse the repository at this point in the history
Flake fixes
  • Loading branch information
alexandreLamarre authored Nov 6, 2023
2 parents f6ab262 + 83e9c92 commit 41dae0d
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 286 deletions.
3 changes: 2 additions & 1 deletion controllers/ai_pretrainedmodel_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("AI PretrainedModel Controller", Ordered, Label("controller"), func() {
// FIXME: https://github.com/rancher/opni/issues/1742
var _ = XDescribe("AI PretrainedModel Controller", Ordered, Label("controller"), func() {
It("should reconcile pretrained model resources", func() {
By("Creating a pretrainedmodel")
model := &aiv1beta1.PretrainedModel{
Expand Down
86 changes: 13 additions & 73 deletions pkg/test/alerting/alert_router_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package alerting
import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -44,32 +41,14 @@ type MockIntegrationWebhookServer struct {
Webhook string
Port int
Addr string
*sync.RWMutex
Buffer []*config.WebhookMessage
}

func (m *MockIntegrationWebhookServer) WriteBuffer(msg *config.WebhookMessage) {
m.Lock()
defer m.Unlock()
m.Buffer = append(m.Buffer, msg)
}

func (m *MockIntegrationWebhookServer) ClearBuffer() {
m.Lock()
defer m.Unlock()
m.Buffer = m.Buffer[:0]
}

func (m *MockIntegrationWebhookServer) GetBuffer() []*config.WebhookMessage {
m.RLock()
defer m.RUnlock()
return lo.Map(m.Buffer, func(msg *config.WebhookMessage, _ int) *config.WebhookMessage {
return msg
})
}

func (m *MockIntegrationWebhookServer) GetWebhook() string {
return "http://" + path.Join(m.Addr, m.Webhook)
webhook := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", m.Addr, m.Port),
}
return webhook.String()
}

func (m *MockIntegrationWebhookServer) Endpoint() *alertingv1.AlertEndpoint {
Expand All @@ -85,60 +64,21 @@ func (m *MockIntegrationWebhookServer) Endpoint() *alertingv1.AlertEndpoint {
}
}

func CreateWebhookServer(e *test.Environment, num int) []*MockIntegrationWebhookServer {
func CreateWebhookServer(num int) []*MockIntegrationWebhookServer {
var servers []*MockIntegrationWebhookServer
for i := 0; i < num; i++ {
servers = append(servers, NewWebhookMemoryServer(e, "webhook"))
servers = append(servers, NewWebhookMemoryServer("webhook"))
}
return servers
}

func NewWebhookMemoryServer(e *test.Environment, webHookRoute string) *MockIntegrationWebhookServer {
port := freeport.GetFreePort()
buf := []*config.WebhookMessage{}
mu := &sync.RWMutex{}
mux := http.NewServeMux()
res := &MockIntegrationWebhookServer{
func NewWebhookMemoryServer(webHookRoute string) *MockIntegrationWebhookServer {
return &MockIntegrationWebhookServer{
EndpointId: shared.NewAlertingRefId("webhook"),
Webhook: webHookRoute,
Port: port,
Buffer: buf,
RWMutex: mu,
EndpointId: uuid.New().String(),
}
if !strings.HasPrefix(webHookRoute, "/") {
webHookRoute = "/" + webHookRoute
Port: 3000,
Addr: "127.0.0.1",
}
mux.HandleFunc(webHookRoute, func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}
var msg config.WebhookMessage
err = yaml.Unmarshal(data, &msg)
if err != nil {
panic(err)
}
res.WriteBuffer(&msg)
})
webhookServer := &http.Server{
Addr: fmt.Sprintf("localhost:%d", port),
Handler: mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
}
res.Addr = webhookServer.Addr

go func() {
err := webhookServer.ListenAndServe()
if err != http.ErrServerClosed {
panic(err)
}
}()
context.AfterFunc(e.Context(), func() {
webhookServer.Shutdown(context.Background())
})
return res
}

type NamespaceSubTreeTestcase struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ var _ = BuildHttpTransportCaching(
func BuildHttpTransportCaching(
t caching.HttpCachingTransport,
) bool {
return Describe("Http util test suites", Ordered, Label("integration"), func() {
//FIXME: https://github.com/rancher/opni/issues/1764
return XDescribe("Http util test suites", Ordered, Label("integration"), func() {
var serverPort int
var cachingClient *http.Client
var defaultClient *http.Client
Expand Down
30 changes: 21 additions & 9 deletions pkg/util/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type updateNotifier[T Clonable[T]] struct {
channelsMu *sync.Mutex
startCond *sync.Cond

gcQueue chan chan []T

latest []T
latestMu sync.Mutex
}
Expand All @@ -46,6 +48,7 @@ func NewUpdateNotifier[T Clonable[T]](finder Finder[T]) UpdateNotifier[T] {
channelsMu: mu,
startCond: sync.NewCond(mu),
latest: []T{},
gcQueue: make(chan chan []T, 128),
}
}

Expand All @@ -61,19 +64,26 @@ func (u *updateNotifier[T]) NotifyC(ctx context.Context) <-chan []T {
}
go func() {
<-ctx.Done()
u.channelsMu.Lock()
defer u.channelsMu.Unlock()
// Remove the channel from the list
for i, c := range u.updateChannels {
if c == updateC {
u.updateChannels = slices.Delete(u.updateChannels, i, i+1)
break
}
}
u.gcQueue <- updateC
}()
return updateC
}

func (u *updateNotifier[T]) gc() {
u.channelsMu.Lock()
defer u.channelsMu.Unlock()
for {
select {
case toDelete := <-u.gcQueue:
u.updateChannels = slices.DeleteFunc(u.updateChannels, func(uc chan []T) bool {
return toDelete == uc
})
default:
return
}
}
}

func (u *updateNotifier[T]) Refresh(ctx context.Context) {
u.channelsMu.Lock()
for len(u.updateChannels) == 0 {
Expand Down Expand Up @@ -104,6 +114,8 @@ func (u *updateNotifier[T]) Refresh(ctx context.Context) {
return
}
u.latest = groups

u.gc()
u.channelsMu.Lock()
cloned := CloneList(u.latest)
for _, c := range u.updateChannels {
Expand Down
11 changes: 5 additions & 6 deletions pkg/util/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var _ = Describe("Update Notifier", Label("unit"), func() {

groups := atomic.Value[[]rules.RuleGroup]{}
groups.Store(testGroups1)
un := notifier.NewUpdateNotifier(mock_rules.NewTestFinder(ctrl, func() []rules.RuleGroup {
updateNotifier := notifier.NewUpdateNotifier(mock_rules.NewTestFinder(ctrl, func() []rules.RuleGroup {
return notifier.CloneList(groups.Load())
}))

Expand All @@ -120,22 +120,21 @@ var _ = Describe("Update Notifier", Label("unit"), func() {

channels := make([]<-chan []rules.RuleGroup, count)
for i := 0; i < count; i++ {
channels[i] = un.NotifyC(contexts[i].ctx)
channels[i] = updateNotifier.NotifyC(contexts[i].ctx)
}

go un.Refresh(context.Background())
go updateNotifier.Refresh(context.Background())

for i := 0; i < count; i++ {
Eventually(channels[i]).Should(Receive(Equal(testGroups1)))
}

groups.Store(testGroups2)

groups.Store(testGroups2) // cancel the channels
for i := 0; i < count; i++ {
contexts[i].ca()
}

go un.Refresh(context.Background())
go updateNotifier.Refresh(context.Background())

for i := 0; i < count; i++ {
Expect(channels[i]).NotTo(Receive())
Expand Down
11 changes: 6 additions & 5 deletions plugins/alerting/pkg/alerting/alarms/v1/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@ import (
type EvaluatorContext struct {
Ctx context.Context
Cancel context.CancelFunc
running atomic.Bool
running *atomic.Bool
}

type Runner struct {
// conditionId -> subsriber pull context cancel func
systemConditionUpdateListeners map[string]EvaluatorContext
systemConditionMu sync.Mutex
systemConditionUpdateListeners map[string]*EvaluatorContext
systemConditionMu *sync.Mutex
}

func NewRunner() *Runner {
return &Runner{
systemConditionUpdateListeners: make(map[string]EvaluatorContext),
systemConditionUpdateListeners: make(map[string]*EvaluatorContext),
systemConditionMu: &sync.Mutex{},
}
}

func (n *Runner) AddSystemConfigListener(conditionId string, eCtx EvaluatorContext) {
func (n *Runner) AddSystemConfigListener(conditionId string, eCtx *EvaluatorContext) {
n.systemConditionMu.Lock()
defer n.systemConditionMu.Unlock()
if oldContext, ok := n.systemConditionUpdateListeners[conditionId]; ok {
Expand Down
22 changes: 13 additions & 9 deletions plugins/alerting/pkg/alerting/alarms/v1/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1"
Expand Down Expand Up @@ -156,9 +157,10 @@ func (p *AlarmServerComponent) onSystemConditionCreate(conditionId, conditionNam
defer cancel() // cancel parent context, if we return (non-recoverable)
evaluator.EvaluateLoop()
}()
p.runner.AddSystemConfigListener(conditionId, EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
running: &atomic.Bool{},
})
return nil
}
Expand Down Expand Up @@ -254,9 +256,10 @@ func (p *AlarmServerComponent) onDownstreamCapabilityConditionCreate(conditionId
defer cancel() // cancel parent context, if we return (non-recoverable)
evaluator.EvaluateLoop()
}()
p.runner.AddSystemConfigListener(conditionId, EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
running: &atomic.Bool{},
})
return nil
}
Expand Down Expand Up @@ -554,9 +557,10 @@ func (p *AlarmServerComponent) onCortexClusterStatusCreate(conditionId, conditio
defer cancel() // cancel parent context, if we return (non-recoverable)
evaluator.EvaluateLoop()
}()
p.runner.AddSystemConfigListener(conditionId, EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
p.runner.AddSystemConfigListener(conditionId, &EvaluatorContext{
Ctx: evaluator.evaluationCtx,
Cancel: evaluator.cancelEvaluation,
running: &atomic.Bool{},
})
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/alerting/pkg/alerting/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (p *Plugin) handleDriverNotifications() {
p.logger.Info("shutting down cluster driver update handler")
return
case client := <-p.clusterNotifier:
p.logger.Info(fmt.Sprintf("updating alerting client based on cluster status : %v", client))
p.logger.Info("updating alerting client based on cluster status")
serverCfg := server.Config{
Client: client.Clone(),
}
Expand Down
Loading

0 comments on commit 41dae0d

Please sign in to comment.