Skip to content

Commit

Permalink
Fix a couple run groups not exiting (#1272)
Browse files Browse the repository at this point in the history
  • Loading branch information
RebeccaMahany authored Aug 1, 2023
1 parent efbbb00 commit 9fab2cf
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 9 deletions.
6 changes: 6 additions & 0 deletions cmd/launcher/internal/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ func NewUpdater(
return nil, err
}

ctx, cancel := context.WithCancel(ctx)

updateCmd := &updaterCmd{
updater: updater,
ctx: ctx,
cancel: cancel,
stopChan: make(chan bool),
config: config,
runUpdaterRetryInterval: 30 * time.Minute,
Expand All @@ -84,6 +87,7 @@ type updater interface {
type updaterCmd struct {
updater updater
ctx context.Context
cancel context.CancelFunc
stopChan chan bool
stopExecution func()
config *UpdaterConfig
Expand Down Expand Up @@ -155,4 +159,6 @@ func (u *updaterCmd) interrupt(err error) {
if u.stopExecution != nil {
u.stopExecution()
}

u.cancel()
}
5 changes: 5 additions & 0 deletions cmd/launcher/internal/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func Test_updaterCmd_execute(t *testing.T) {
u := &updaterCmd{
updater: tt.fields.updater,
ctx: ctx,
cancel: cancelCtx,
stopChan: tt.fields.stopChan,
config: tt.fields.config,
runUpdaterRetryInterval: tt.fields.runUpdaterRetryInterval,
Expand Down Expand Up @@ -194,9 +195,13 @@ func Test_updaterCmd_interrupt(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())

u := &updaterCmd{
stopChan: tt.fields.stopChan,
config: tt.fields.config,
ctx: ctx,
cancel: cancel,
}

// using this wait group to ensure that something gets received on u.StopChan
Expand Down
12 changes: 6 additions & 6 deletions pkg/autoupdate/tuf/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewTufAutoupdater(k types.Knapsack, metadataHttpClient *http.Client, mirror
checkInterval: k.AutoupdateInterval(),
store: k.AutoupdateErrorsStore(),
osquerier: osquerier,
osquerierRetryInterval: 1 * time.Minute,
osquerierRetryInterval: 30 * time.Second,
logger: log.NewNopLogger(),
}

Expand Down Expand Up @@ -184,10 +184,10 @@ func (ta *TufAutoupdater) Execute() (err error) {
}

func (ta *TufAutoupdater) Interrupt(_ error) {
ta.interrupt <- struct{}{}
if err := ta.libraryManager.Close(); err != nil {
level.Debug(ta.logger).Log("msg", "could not close library on interrupt", "err", err)
}
ta.interrupt <- struct{}{}
}

// tidyLibrary gets the current running version for each binary (so that the current version is not removed)
Expand Down Expand Up @@ -306,6 +306,10 @@ func (ta *TufAutoupdater) downloadUpdate(binary autoupdatableBinary, targets dat
return "", fmt.Errorf("could not find release: %w", err)
}

if ta.libraryManager.Available(binary, release) {
return "", nil
}

// Get the current running version if available -- don't error out if we can't
// get it, since the worst case is that we download an update whose version matches
// our install version.
Expand All @@ -315,10 +319,6 @@ func (ta *TufAutoupdater) downloadUpdate(binary autoupdatableBinary, targets dat
return "", nil
}

if ta.libraryManager.Available(binary, release) {
return "", nil
}

if err := ta.libraryManager.AddToLibrary(binary, currentVersion, release, releaseMetadata); err != nil {
return "", fmt.Errorf("could not add release %s for binary %s to library: %w", release, binary, err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/autoupdate/tuf/library_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func newUpdateLibraryManager(mirrorUrl string, mirrorClient *http.Client, baseDi

// Close cleans up the temporary staging directory
func (ulm *updateLibraryManager) Close() error {
// Acquire lock to ensure we aren't interrupting an ongoing operation
for _, binary := range binaries {
ulm.lock.Lock(binary)
defer ulm.lock.Unlock(binary)
}

if err := os.RemoveAll(ulm.stagingDir); err != nil {
return fmt.Errorf("could not remove staging dir %s: %w", ulm.stagingDir, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sendbuffer/sendbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (sb *SendBuffer) Run(ctx context.Context) error {
case <-ticker.C:
continue
case <-ctx.Done():
break
return nil
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/traces/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type TraceExporter struct {
disableIngestTLS bool
enabled bool
traceSamplingRate float64
interrupt chan struct{}
}

// NewTraceExporter sets up our traces to be exported via OTLP over HTTP.
Expand Down Expand Up @@ -86,6 +87,7 @@ func NewTraceExporter(ctx context.Context, k types.Knapsack, client osquery.Quer
disableIngestTLS: k.DisableTraceIngestTLS(),
enabled: k.ExportTraces(),
traceSamplingRate: k.TraceSamplingRate(),
interrupt: make(chan struct{}),
}

// Observe ExportTraces and IngestServerURL changes to know when to start/stop exporting, and where
Expand Down Expand Up @@ -248,15 +250,16 @@ func (t *TraceExporter) setNewGlobalProvider() {
// Execute is a no-op -- the exporter is already running in the background. The TraceExporter
// otherwise only responds to control server events.
func (t *TraceExporter) Execute() error {
// Does nothing, just waiting for launcher to exit
<-context.Background().Done()
<-t.interrupt
return nil
}

func (t *TraceExporter) Interrupt(_ error) {
if t.provider != nil {
t.provider.Shutdown(context.Background())
}

t.interrupt <- struct{}{}
}

// Update satisfies control.subscriber interface -- looks at changes to the `observability_ingest` subsystem,
Expand Down

0 comments on commit 9fab2cf

Please sign in to comment.