Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple run groups not exiting #1272

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/launcher/internal/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ func NewUpdater(
return nil, err
}

ctx, cancel := context.WithCancel(ctx)

updateCmd := &updaterCmd{
updater: updater,
ctx: ctx,
cancel: cancel,
stopChan: make(chan bool),
config: config,
runUpdaterRetryInterval: 30 * time.Minute,
Expand All @@ -84,6 +87,7 @@ type updater interface {
type updaterCmd struct {
updater updater
ctx context.Context
cancel context.CancelFunc
stopChan chan bool
stopExecution func()
config *UpdaterConfig
Expand Down Expand Up @@ -155,4 +159,6 @@ func (u *updaterCmd) interrupt(err error) {
if u.stopExecution != nil {
u.stopExecution()
}

u.cancel()
}
5 changes: 5 additions & 0 deletions cmd/launcher/internal/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func Test_updaterCmd_execute(t *testing.T) {
u := &updaterCmd{
updater: tt.fields.updater,
ctx: ctx,
cancel: cancelCtx,
stopChan: tt.fields.stopChan,
config: tt.fields.config,
runUpdaterRetryInterval: tt.fields.runUpdaterRetryInterval,
Expand Down Expand Up @@ -194,9 +195,13 @@ func Test_updaterCmd_interrupt(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())

u := &updaterCmd{
stopChan: tt.fields.stopChan,
config: tt.fields.config,
ctx: ctx,
cancel: cancel,
}

// using this wait group to ensure that something gets received on u.StopChan
Expand Down
12 changes: 6 additions & 6 deletions pkg/autoupdate/tuf/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewTufAutoupdater(k types.Knapsack, metadataHttpClient *http.Client, mirror
checkInterval: k.AutoupdateInterval(),
store: k.AutoupdateErrorsStore(),
osquerier: osquerier,
osquerierRetryInterval: 1 * time.Minute,
osquerierRetryInterval: 30 * time.Second,
logger: log.NewNopLogger(),
}

Expand Down Expand Up @@ -184,10 +184,10 @@ func (ta *TufAutoupdater) Execute() (err error) {
}

func (ta *TufAutoupdater) Interrupt(_ error) {
ta.interrupt <- struct{}{}
if err := ta.libraryManager.Close(); err != nil {
level.Debug(ta.logger).Log("msg", "could not close library on interrupt", "err", err)
}
ta.interrupt <- struct{}{}
}

// tidyLibrary gets the current running version for each binary (so that the current version is not removed)
Expand Down Expand Up @@ -306,6 +306,10 @@ func (ta *TufAutoupdater) downloadUpdate(binary autoupdatableBinary, targets dat
return "", fmt.Errorf("could not find release: %w", err)
}

if ta.libraryManager.Available(binary, release) {
return "", nil
}

// Get the current running version if available -- don't error out if we can't
// get it, since the worst case is that we download an update whose version matches
// our install version.
Expand All @@ -315,10 +319,6 @@ func (ta *TufAutoupdater) downloadUpdate(binary autoupdatableBinary, targets dat
return "", nil
}

if ta.libraryManager.Available(binary, release) {
return "", nil
}

if err := ta.libraryManager.AddToLibrary(binary, currentVersion, release, releaseMetadata); err != nil {
return "", fmt.Errorf("could not add release %s for binary %s to library: %w", release, binary, err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/autoupdate/tuf/library_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func newUpdateLibraryManager(mirrorUrl string, mirrorClient *http.Client, baseDi

// Close cleans up the temporary staging directory
func (ulm *updateLibraryManager) Close() error {
// Acquire lock to ensure we aren't interrupting an ongoing operation
for _, binary := range binaries {
ulm.lock.Lock(binary)
defer ulm.lock.Unlock(binary)
}

if err := os.RemoveAll(ulm.stagingDir); err != nil {
return fmt.Errorf("could not remove staging dir %s: %w", ulm.stagingDir, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sendbuffer/sendbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (sb *SendBuffer) Run(ctx context.Context) error {
case <-ticker.C:
continue
case <-ctx.Done():
break
return nil
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/traces/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type TraceExporter struct {
disableIngestTLS bool
enabled bool
traceSamplingRate float64
interrupt chan struct{}
}

// NewTraceExporter sets up our traces to be exported via OTLP over HTTP.
Expand Down Expand Up @@ -86,6 +87,7 @@ func NewTraceExporter(ctx context.Context, k types.Knapsack, client osquery.Quer
disableIngestTLS: k.DisableTraceIngestTLS(),
enabled: k.ExportTraces(),
traceSamplingRate: k.TraceSamplingRate(),
interrupt: make(chan struct{}),
}

// Observe ExportTraces and IngestServerURL changes to know when to start/stop exporting, and where
Expand Down Expand Up @@ -248,15 +250,16 @@ func (t *TraceExporter) setNewGlobalProvider() {
// Execute is a no-op -- the exporter is already running in the background. The TraceExporter
// otherwise only responds to control server events.
func (t *TraceExporter) Execute() error {
// Does nothing, just waiting for launcher to exit
<-context.Background().Done()
<-t.interrupt
return nil
}

func (t *TraceExporter) Interrupt(_ error) {
if t.provider != nil {
t.provider.Shutdown(context.Background())
}

t.interrupt <- struct{}{}
}

// Update satisfies control.subscriber interface -- looks at changes to the `observability_ingest` subsystem,
Expand Down
Loading