Skip to content

Commit

Permalink
Wait for the old CRD Manager to stop before starting a new one (#1778)
Browse files Browse the repository at this point in the history
* Wait for the old CRD Manager to stop before starting a new one

* Write a test

* Let the test wait for Run to be called first

* Rename wait group

* Add comment
  • Loading branch information
ptodev authored Nov 1, 2024
1 parent ee4ae44 commit 0b41558
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ Main (unreleased)

- Do not log error on clean shutdown of `loki.source.journal`. (@thampiotr)

- `prometheus.operator.*` components: Fixed a bug which would sometimes cause a
"failed to create service discovery refresh metrics" error after a config reload. (@ptodev)

### Other changes

- Small fix in UI stylesheet to fit more content into visible table area. (@defanator)
Expand Down
29 changes: 21 additions & 8 deletions internal/component/prometheus/operator/common/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ import (
type Component struct {
mut sync.RWMutex
config *operator.Arguments
manager *crdManager
manager crdManagerInterface
ls labelstore.LabelStore

onUpdate chan struct{}
opts component.Options
healthMut sync.RWMutex
health component.Health

crdManagerFactory crdManagerFactory

kind string
cluster cluster.Cluster
}
Expand All @@ -44,11 +46,12 @@ func New(o component.Options, args component.Arguments, kind string) (*Component
}
ls := service.(labelstore.LabelStore)
c := &Component{
opts: o,
onUpdate: make(chan struct{}, 1),
kind: kind,
cluster: clusterData,
ls: ls,
opts: o,
onUpdate: make(chan struct{}, 1),
kind: kind,
cluster: clusterData,
ls: ls,
crdManagerFactory: realCrdManagerFactory{},
}
return c, c.Update(args)
}
Expand All @@ -74,6 +77,8 @@ func (c *Component) Run(ctx context.Context) error {

c.reportHealth(nil)
errChan := make(chan error, 1)
wg := sync.WaitGroup{}
defer wg.Wait()
for {
select {
case <-ctx.Done():
Expand All @@ -85,17 +90,25 @@ func (c *Component) Run(ctx context.Context) error {
c.reportHealth(err)
case <-c.onUpdate:
c.mut.Lock()
manager := newCrdManager(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls)
manager := c.crdManagerFactory.New(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls)
c.manager = manager

// Wait for the old manager to stop.
// If we start the new manager before stopping the old one,
// the new manager might not be able to register its debug metrics due to a duplicate registration error.
if cancel != nil {
cancel()
}
wg.Wait()

innerCtx, cancel = context.WithCancel(ctx)
wg.Add(1)
go func() {
if err := manager.Run(innerCtx); err != nil {
level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err)
errChan <- err
}
wg.Done()
}()
c.mut.Unlock()
}
Expand Down Expand Up @@ -170,7 +183,7 @@ func (c *Component) Handler() http.Handler {
}
ns := parts[1]
name := parts[2]
scs := man.getScrapeConfig(ns, name)
scs := man.GetScrapeConfig(ns, name)
if len(scs) == 0 {
w.WriteHeader(404)
return
Expand Down
127 changes: 127 additions & 0 deletions internal/component/prometheus/operator/common/component_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package common

import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/prometheus/operator"
"github.com/grafana/alloy/internal/service/cluster"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type crdManagerFactoryHungRun struct {
stopRun chan struct{}
}

func (m crdManagerFactoryHungRun) New(_ component.Options, _ cluster.Cluster, _ log.Logger,
_ *operator.Arguments, _ string, _ labelstore.LabelStore) crdManagerInterface {
return &crdManagerHungRun{
stopRun: m.stopRun,
}
}

type crdManagerHungRun struct {
stopRun chan struct{}
}

func (c *crdManagerHungRun) Run(ctx context.Context) error {
<-ctx.Done()
<-c.stopRun
return nil
}

func (c *crdManagerHungRun) ClusteringUpdated() {}

func (c *crdManagerHungRun) DebugInfo() interface{} {
return nil
}

func (c *crdManagerHungRun) GetScrapeConfig(ns, name string) []*config.ScrapeConfig {
return nil
}

func TestRunExit(t *testing.T) {
opts := component.Options{
Logger: util.TestAlloyLogger(t),
Registerer: prometheus.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
switch name {
case http_service.ServiceName:
return http_service.Data{
HTTPListenAddr: "localhost:12345",
MemoryListenAddr: "alloy.internal:1245",
BaseHTTPPath: "/",
DialFunc: (&net.Dialer{}).DialContext,
}, nil

case cluster.ServiceName:
return cluster.Mock(), nil
case labelstore.ServiceName:
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("service %q does not exist", name)
}
},
}

nilReceivers := []storage.Appendable{nil, nil}

var args operator.Arguments
args.SetToDefault()
args.ForwardTo = nilReceivers

// Create a Component
c, err := New(opts, args, "")
require.NoError(t, err)

stopRun := make(chan struct{})
c.crdManagerFactory = crdManagerFactoryHungRun{
stopRun: stopRun,
}

// Run the component
ctx, cancelFunc := context.WithCancel(context.Background())
cmpRunExited := atomic.Bool{}
cmpRunExited.Store(false)

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
err := c.Run(ctx)
require.NoError(t, err)
cmpRunExited.Store(true)
}()
// Wait until the component.Run goroutine starts
// The test can be flaky without this.
wg.Wait()

// Stop the component.
// It shouldn't stop immediately, because the CRD Manager is hung.
cancelFunc()
time.Sleep(5 * time.Second)
if cmpRunExited.Load() {
require.Fail(t, "component.Run exited")
}

// Make crdManager.Run exit
close(stopRun)

// Make sure component.Run exits
require.Eventually(t, func() bool {
return cmpRunExited.Load()
}, 5*time.Second, 100*time.Millisecond, "component.Run didn't exit")
}
19 changes: 18 additions & 1 deletion internal/component/prometheus/operator/common/crdmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ import (
// Generous timeout period for configuring all informers
const informerSyncTimeout = 10 * time.Second

type crdManagerInterface interface {
Run(ctx context.Context) error
ClusteringUpdated()
DebugInfo() interface{}
GetScrapeConfig(ns, name string) []*config.ScrapeConfig
}

type crdManagerFactory interface {
New(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string, ls labelstore.LabelStore) crdManagerInterface
}

type realCrdManagerFactory struct{}

func (realCrdManagerFactory) New(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string, ls labelstore.LabelStore) crdManagerInterface {
return newCrdManager(opts, cluster, logger, args, kind, ls)
}

// crdManager is all of the fields required to run a crd based component.
// on update, this entire thing should be recreated and restarted
type crdManager struct {
Expand Down Expand Up @@ -237,7 +254,7 @@ func (c *crdManager) DebugInfo() interface{} {
return info
}

func (c *crdManager) getScrapeConfig(ns, name string) []*config.ScrapeConfig {
func (c *crdManager) GetScrapeConfig(ns, name string) []*config.ScrapeConfig {
prefix := fmt.Sprintf("%s/%s/%s", c.kind, ns, name)
matches := []*config.ScrapeConfig{}
for k, v := range c.scrapeConfigs {
Expand Down

0 comments on commit 0b41558

Please sign in to comment.