diff --git a/cmd/mtail/main.go b/cmd/mtail/main.go index 9d8a0183b..b3dc7b0a9 100644 --- a/cmd/mtail/main.go +++ b/cmd/mtail/main.go @@ -11,7 +11,6 @@ import ( "os/signal" "runtime" "strings" - "sync" "syscall" "time" @@ -249,23 +248,22 @@ func main() { if *oneShot { switch *oneShotFormat { case "prometheus": - var wg sync.WaitGroup - e, err := exporter.New(ctx, &wg, store, eOpts...) + e, err := exporter.New(ctx, store, eOpts...) if err != nil { glog.Error(err) cancel() - wg.Wait() + e.Stop() os.Exit(1) //nolint:gocritic // false positive } err = e.Write(os.Stdout) if err != nil { glog.Error(err) cancel() - wg.Wait() + e.Stop() os.Exit(1) //nolint:gocritic // false positive } cancel() - wg.Wait() + e.Stop() os.Exit(0) //nolint:gocritic // false positive case "json": err = store.WriteMetrics(os.Stdout) diff --git a/internal/exporter/export.go b/internal/exporter/export.go index 55bcf873b..1825974fd 100644 --- a/internal/exporter/export.go +++ b/internal/exporter/export.go @@ -31,6 +31,7 @@ var ( // Exporter manages the export of metrics to passive and active collectors. type Exporter struct { ctx context.Context + cancelFunc context.CancelFunc wg sync.WaitGroup store *metrics.Store pushInterval time.Duration @@ -40,6 +41,7 @@ type Exporter struct { exportDisabled bool pushTargets []pushOptions initDone chan struct{} + shutdownDone chan struct{} } // Option configures a new Exporter. @@ -83,24 +85,19 @@ func DisableExport() Option { } } -var ( - ErrNeedsStore = errors.New("exporter needs a Store") - ErrNeedsWaitgroup = errors.New("exporter needs a WaitGroup") -) +var ErrNeedsStore = errors.New("exporter needs a Store") // New creates a new Exporter. -func New(ctx context.Context, wg *sync.WaitGroup, store *metrics.Store, options ...Option) (*Exporter, error) { +func New(ctx context.Context, store *metrics.Store, options ...Option) (*Exporter, error) { if store == nil { return nil, ErrNeedsStore } - if wg == nil { - return nil, ErrNeedsWaitgroup - } e := &Exporter{ - ctx: ctx, - store: store, - initDone: make(chan struct{}), + store: store, + initDone: make(chan struct{}), + shutdownDone: make(chan struct{}), } + e.ctx, e.cancelFunc = context.WithCancel(ctx) defer close(e.initDone) if err := e.SetOption(options...); err != nil { return nil, err @@ -128,20 +125,25 @@ func New(ctx context.Context, wg *sync.WaitGroup, store *metrics.Store, options } e.StartMetricPush() - wg.Add(1) // This routine manages shutdown of the Exporter. go func() { - defer wg.Done() <-e.initDone // Wait for the context to be completed before waiting for subroutines. if !e.exportDisabled { <-e.ctx.Done() } e.wg.Wait() + close(e.shutdownDone) }() return e, nil } +// Stop instructs the exporter to shut down. The function returns once the exporter has finished. +func (e *Exporter) Stop() { + e.cancelFunc() + <-e.shutdownDone +} + // SetOption takes one or more option functions and applies them in order to Exporter. func (e *Exporter) SetOption(options ...Option) error { for _, option := range options { diff --git a/internal/exporter/export_test.go b/internal/exporter/export_test.go index c0a319117..317cbba06 100644 --- a/internal/exporter/export_test.go +++ b/internal/exporter/export_test.go @@ -9,7 +9,6 @@ import ( "reflect" "sort" "strings" - "sync" "testing" "time" @@ -22,37 +21,26 @@ const prefix = "prefix" func TestCreateExporter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup + defer cancel() store := metrics.NewStore() - _, err := New(ctx, &wg, store) + + e, err := New(ctx, store) if err != nil { - t.Errorf("New(ctx, wg, store) unexpected error: %v", err) + t.Errorf("New(ctx, store) unexpected error: %v", err) } - cancel() - wg.Wait() - ctx, cancel = context.WithCancel(context.Background()) + e.Stop() + failopt := func(*Exporter) error { return errors.New("busted") // nolint:goerr113 } - _, err = New(ctx, &wg, store, failopt) + _, err = New(ctx, store, failopt) if err == nil { - t.Errorf("unexpected success") + t.Error("New(ctx, store, fail) -> unexpected success") } - cancel() - wg.Wait() -} -func TestNewErrors(t *testing.T) { - ctx := context.Background() - store := metrics.NewStore() - var wg sync.WaitGroup - _, err := New(ctx, nil, store) - if err == nil { - t.Error("New(ctx, nil, store) expecting error, received nil") - } - _, err = New(ctx, &wg, nil) + _, err = New(ctx, nil) if err == nil { - t.Error("New(ctx, wg, nil) expecting error, received nil") + t.Error("New(ctx, nil) -> nil, expecting error") } } diff --git a/internal/exporter/graphite_test.go b/internal/exporter/graphite_test.go index 9e5b175da..b39cdf97d 100644 --- a/internal/exporter/graphite_test.go +++ b/internal/exporter/graphite_test.go @@ -47,12 +47,13 @@ func TestHandleGraphite(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup + defer cancel() + ms := metrics.NewStore() for _, metric := range tc.metrics { testutil.FatalIfErr(t, ms.Add(metric)) } - e, err := New(ctx, &wg, ms, Hostname("gunstar")) + e, err := New(ctx, ms, Hostname("gunstar")) testutil.FatalIfErr(t, err) response := httptest.NewRecorder() e.HandleGraphite(response, &http.Request{}) @@ -64,8 +65,7 @@ func TestHandleGraphite(t *testing.T) { t.Errorf("failed to read response %s", err) } testutil.ExpectNoDiff(t, tc.expected, string(b), testutil.IgnoreUnexported(sync.RWMutex{})) - cancel() - wg.Wait() + e.Stop() }) } } diff --git a/internal/exporter/json_test.go b/internal/exporter/json_test.go index c7e61c1d7..3235f870f 100644 --- a/internal/exporter/json_test.go +++ b/internal/exporter/json_test.go @@ -141,12 +141,13 @@ func TestHandleJSON(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup + defer cancel() + ms := metrics.NewStore() for _, metric := range tc.metrics { testutil.FatalIfErr(t, ms.Add(metric)) } - e, err := New(ctx, &wg, ms, Hostname("gunstar")) + e, err := New(ctx, ms, Hostname("gunstar")) testutil.FatalIfErr(t, err) response := httptest.NewRecorder() e.HandleJSON(response, &http.Request{}) @@ -158,8 +159,8 @@ func TestHandleJSON(t *testing.T) { t.Errorf("failed to read response: %s", err) } testutil.ExpectNoDiff(t, tc.expected, string(b), testutil.IgnoreUnexported(sync.RWMutex{})) - cancel() - wg.Wait() + + e.Stop() }) } } diff --git a/internal/exporter/prometheus_test.go b/internal/exporter/prometheus_test.go index 4313be951..e86f14d3b 100644 --- a/internal/exporter/prometheus_test.go +++ b/internal/exporter/prometheus_test.go @@ -8,7 +8,6 @@ import ( "context" "math" "strings" - "sync" "testing" "time" @@ -255,8 +254,9 @@ func TestHandlePrometheus(t *testing.T) { for _, tc := range handlePrometheusTests { tc := tc t.Run(tc.name, func(t *testing.T) { - var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ms := metrics.NewStore() for _, metric := range tc.metrics { testutil.FatalIfErr(t, ms.Add(metric)) @@ -267,14 +267,13 @@ func TestHandlePrometheus(t *testing.T) { if !tc.progLabel { opts = append(opts, OmitProgLabel()) } - e, err := New(ctx, &wg, ms, opts...) + e, err := New(ctx, ms, opts...) testutil.FatalIfErr(t, err) r := strings.NewReader(tc.expected) if err = promtest.CollectAndCompare(e, r); err != nil { t.Error(err) } - cancel() - wg.Wait() + e.Stop() }) } } @@ -334,8 +333,9 @@ func TestWritePrometheus(t *testing.T) { for _, tc := range writePrometheusTests { tc := tc t.Run(tc.name, func(t *testing.T) { - var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ms := metrics.NewStore() for _, metric := range tc.metrics { testutil.FatalIfErr(t, ms.Add(metric)) @@ -344,7 +344,7 @@ func TestWritePrometheus(t *testing.T) { Hostname("gunstar"), OmitProgLabel(), } - e, err := New(ctx, &wg, ms, opts...) + e, err := New(ctx, ms, opts...) testutil.FatalIfErr(t, err) var buf bytes.Buffer @@ -352,8 +352,7 @@ func TestWritePrometheus(t *testing.T) { testutil.FatalIfErr(t, err) testutil.ExpectNoDiff(t, tc.expected, buf.String()) - cancel() - wg.Wait() + e.Stop() }) } } diff --git a/internal/exporter/varz_test.go b/internal/exporter/varz_test.go index bc168e1d4..9026be7be 100644 --- a/internal/exporter/varz_test.go +++ b/internal/exporter/varz_test.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "net/http/httptest" - "sync" "testing" "time" @@ -73,13 +72,14 @@ func TestHandleVarz(t *testing.T) { for _, tc := range handleVarzTests { tc := tc t.Run(tc.name, func(t *testing.T) { - var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ms := metrics.NewStore() for _, metric := range tc.metrics { testutil.FatalIfErr(t, ms.Add(metric)) } - e, err := New(ctx, &wg, ms, Hostname("gunstar")) + e, err := New(ctx, ms, Hostname("gunstar")) testutil.FatalIfErr(t, err) response := httptest.NewRecorder() e.HandleVarz(response, &http.Request{}) @@ -91,8 +91,8 @@ func TestHandleVarz(t *testing.T) { t.Errorf("failed to read response: %s", err) } testutil.ExpectNoDiff(t, tc.expected, string(b)) - cancel() - wg.Wait() + + e.Stop() }) } } diff --git a/internal/mtail/mtail.go b/internal/mtail/mtail.go index 17b6c49cd..91750e547 100644 --- a/internal/mtail/mtail.go +++ b/internal/mtail/mtail.go @@ -66,7 +66,7 @@ func (m *Server) initRuntime() (err error) { // initExporter sets up an Exporter for this Server. func (m *Server) initExporter() (err error) { - m.e, err = exporter.New(m.ctx, &m.wg, m.store, m.eOpts...) + m.e, err = exporter.New(m.ctx, m.store, m.eOpts...) if err != nil { return err } @@ -234,6 +234,7 @@ func (m *Server) SetOption(options ...Option) error { // TODO(jaq): remove this once the test server is able to trigger polls on the components. func (m *Server) Run() error { m.wg.Wait() + m.e.Stop() if m.compileOnly { glog.Info("compile-only is set, exiting") return nil