diff --git a/server/service/campaigns.go b/server/service/campaigns.go index efa82fbf8902..cba57bd04eaf 100644 --- a/server/service/campaigns.go +++ b/server/service/campaigns.go @@ -156,17 +156,17 @@ func (svc *Service) NewDistributedQueryCampaign(ctx context.Context, queryString } } - err = svc.liveQueryStore.RunQuery(strconv.Itoa(int(campaign.ID)), queryString, hostIDs) - if err != nil { - return nil, ctxerr.Wrap(ctx, err, "run query") - } - // Metrics are used for total hosts targeted for the activity feed. campaign.Metrics, err = svc.ds.CountHostsInTargets(ctx, filter, targets, time.Now()) if err != nil { return nil, ctxerr.Wrap(ctx, err, "counting hosts") } + err = svc.liveQueryStore.RunQuery(strconv.Itoa(int(campaign.ID)), queryString, hostIDs) + if err != nil { + return nil, ctxerr.Wrap(ctx, err, "run query") + } + return campaign, nil } diff --git a/server/service/live_queries.go b/server/service/live_queries.go index 7bbe6a79b18a..11c46457b56e 100644 --- a/server/service/live_queries.go +++ b/server/service/live_queries.go @@ -263,27 +263,30 @@ func (svc *Service) RunLiveQueryDeadline( queryIDPtr = nil queryString = query } + campaign, err := svc.NewDistributedQueryCampaign(ctx, queryString, queryIDPtr, fleet.HostTargets{HostIDs: hostIDs}) if err != nil { + level.Error(svc.logger).Log( + "msg", "new distributed query campaign", + "queryString", queryString, + "queryID", queryID, + "err", err, + ) resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err} return } queryID = campaign.QueryID - readChan, cancelFunc, err := svc.GetCampaignReader(ctx, campaign) - if err != nil { - resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err} - return - } - defer cancelFunc() - + // We do not want to use the outer `ctx` directly because we want to cleanup the campaign + // even if the outer `ctx` is canceled (e.g. a client terminating the connection). + // Also, we make sure stats and activity DB operations don't get killed after we return results. + ctxWithoutCancel := context.WithoutCancel(ctx) defer func() { - // We do not want to use the outer `ctx` directly because we want to cleanup the campaign - // even if the outer `ctx` is canceled (e.g. a client terminating the connection). - ctx := context.WithoutCancel(ctx) - err := svc.CompleteCampaign(ctx, campaign) + err := svc.CompleteCampaign(ctxWithoutCancel, campaign) if err != nil { - level.Error(svc.logger).Log("msg", "completing campaign (sync)", "query.id", campaign.QueryID, "err", err) + level.Error(svc.logger).Log( + "msg", "completing campaign (sync)", "query.id", campaign.QueryID, "campaign.id", campaign.ID, "err", err, + ) resultsCh <- fleet.QueryCampaignResult{ QueryID: queryID, Error: ptr.String(err.Error()), @@ -292,6 +295,16 @@ func (svc *Service) RunLiveQueryDeadline( } }() + readChan, cancelFunc, err := svc.GetCampaignReader(ctx, campaign) + if err != nil { + level.Error(svc.logger).Log( + "msg", "get campaign reader", "query.id", campaign.QueryID, "campaign.id", campaign.ID, "err", err, + ) + resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err} + return + } + defer cancelFunc() + var results []fleet.QueryResult timeout := time.After(deadline) @@ -305,8 +318,6 @@ func (svc *Service) RunLiveQueryDeadline( level.Error(svc.logger).Log("msg", "error checking saved query", "query.id", campaign.QueryID, "err", err) perfStatsTracker.saveStats = false } - // to make sure stats and activity DB operations don't get killed after we return results. - ctxWithoutCancel := context.WithoutCancel(ctx) totalHosts := campaign.Metrics.TotalHosts // We update aggregated stats and activity at the end asynchronously. defer func() {