Skip to content

Commit

Permalink
Moving context.WithoutCancel outside defer. (#17260)
Browse files Browse the repository at this point in the history
#17197

Fixing orphaned live queries when context is canceled

Co-authored-by: Lucas Rodriguez <lucas@fleetdm.com>
  • Loading branch information
2 people authored and sharon-fdm committed Feb 29, 2024
1 parent 105255e commit b953b75
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
10 changes: 5 additions & 5 deletions server/service/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,17 @@ func (svc *Service) NewDistributedQueryCampaign(ctx context.Context, queryString
}
}

err = svc.liveQueryStore.RunQuery(strconv.Itoa(int(campaign.ID)), queryString, hostIDs)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "run query")
}

// Metrics are used for total hosts targeted for the activity feed.
campaign.Metrics, err = svc.ds.CountHostsInTargets(ctx, filter, targets, time.Now())
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "counting hosts")
}

err = svc.liveQueryStore.RunQuery(strconv.Itoa(int(campaign.ID)), queryString, hostIDs)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "run query")
}

return campaign, nil
}

Expand Down
39 changes: 25 additions & 14 deletions server/service/live_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,27 +263,30 @@ func (svc *Service) RunLiveQueryDeadline(
queryIDPtr = nil
queryString = query
}

campaign, err := svc.NewDistributedQueryCampaign(ctx, queryString, queryIDPtr, fleet.HostTargets{HostIDs: hostIDs})
if err != nil {
level.Error(svc.logger).Log(
"msg", "new distributed query campaign",
"queryString", queryString,
"queryID", queryID,
"err", err,
)
resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err}
return
}
queryID = campaign.QueryID

readChan, cancelFunc, err := svc.GetCampaignReader(ctx, campaign)
if err != nil {
resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err}
return
}
defer cancelFunc()

// We do not want to use the outer `ctx` directly because we want to cleanup the campaign
// even if the outer `ctx` is canceled (e.g. a client terminating the connection).
// Also, we make sure stats and activity DB operations don't get killed after we return results.
ctxWithoutCancel := context.WithoutCancel(ctx)
defer func() {
// We do not want to use the outer `ctx` directly because we want to cleanup the campaign
// even if the outer `ctx` is canceled (e.g. a client terminating the connection).
ctx := context.WithoutCancel(ctx)
err := svc.CompleteCampaign(ctx, campaign)
err := svc.CompleteCampaign(ctxWithoutCancel, campaign)
if err != nil {
level.Error(svc.logger).Log("msg", "completing campaign (sync)", "query.id", campaign.QueryID, "err", err)
level.Error(svc.logger).Log(
"msg", "completing campaign (sync)", "query.id", campaign.QueryID, "campaign.id", campaign.ID, "err", err,
)
resultsCh <- fleet.QueryCampaignResult{
QueryID: queryID,
Error: ptr.String(err.Error()),
Expand All @@ -292,6 +295,16 @@ func (svc *Service) RunLiveQueryDeadline(
}
}()

readChan, cancelFunc, err := svc.GetCampaignReader(ctx, campaign)
if err != nil {
level.Error(svc.logger).Log(
"msg", "get campaign reader", "query.id", campaign.QueryID, "campaign.id", campaign.ID, "err", err,
)
resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err}
return
}
defer cancelFunc()

var results []fleet.QueryResult
timeout := time.After(deadline)

Expand All @@ -305,8 +318,6 @@ func (svc *Service) RunLiveQueryDeadline(
level.Error(svc.logger).Log("msg", "error checking saved query", "query.id", campaign.QueryID, "err", err)
perfStatsTracker.saveStats = false
}
// to make sure stats and activity DB operations don't get killed after we return results.
ctxWithoutCancel := context.WithoutCancel(ctx)
totalHosts := campaign.Metrics.TotalHosts
// We update aggregated stats and activity at the end asynchronously.
defer func() {
Expand Down

0 comments on commit b953b75

Please sign in to comment.