From d4b89c17d10d2979c41e126c4a881015ba3647c1 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Fri, 8 Dec 2023 20:56:00 +0800 Subject: [PATCH] fix: span query read path (#2822) --- pkg/phlaredb/block_querier.go | 17 ++++++++--------- pkg/querier/querier.go | 6 +++--- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 468a92132b..f2ab0ab2e9 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -975,6 +975,7 @@ func MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.M Type: request.Type, Start: request.Start, End: request.End, + Hints: request.Hints, }, queriers) if err != nil { return err @@ -1008,13 +1009,13 @@ func MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.M return nil })) } - } - // Signals the end of the profile streaming by sending an empty response. - // This allows the client to not block other streaming ingesters. - sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming")) - if err = stream.Send(&ingestv1.MergeSpanProfileResponse{}); err != nil { - return err + // Signals the end of the profile streaming by sending an empty response. + // This allows the client to not block other streaming ingesters. + sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming")) + if err = stream.Send(&ingestv1.MergeSpanProfileResponse{}); err != nil { + return err + } } if err = g.Wait(); err != nil { @@ -1032,11 +1033,9 @@ func MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.M otlog.String("msg", "sending the final result to the client"), otlog.Int("tree_bytes", len(treeBytes)), ) - - sp.LogFields(otlog.String("msg", "sending the final result to the client")) err = stream.Send(&ingestv1.MergeSpanProfileResponse{ Result: &ingestv1.MergeSpanProfileResult{ - TreeBytes: buf.Bytes(), + TreeBytes: treeBytes, }, }) if err != nil { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index de35b05223..aa1cdd4ff8 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1062,17 +1062,17 @@ func (q *Querier) selectSpanProfile(ctx context.Context, req *querierv1.SelectMe return q.selectSpanProfileFromIngesters(ctx, req, plan) } - storeQueries := splitQueryToStores(model.Time(req.Start), model.Time(req.End), model.Now(), q.cfg.QueryStoreAfter, nil) + storeQueries := splitQueryToStores(model.Time(req.Start), model.Time(req.End), model.Now(), q.cfg.QueryStoreAfter, plan) if !storeQueries.ingester.shouldQuery && !storeQueries.storeGateway.shouldQuery { return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("start and end time are outside of the ingester and store gateway retention")) } storeQueries.Log(level.Debug(spanlogger.FromContext(ctx, q.logger))) - if !storeQueries.ingester.shouldQuery { + if plan == nil && !storeQueries.ingester.shouldQuery { return q.selectSpanProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeSpanProfileRequest(req), plan) } - if !storeQueries.storeGateway.shouldQuery { + if plan == nil && !storeQueries.storeGateway.shouldQuery { return q.selectSpanProfileFromIngesters(ctx, storeQueries.ingester.MergeSpanProfileRequest(req), plan) }