@@ -540,26 +540,27 @@ type Profile interface {
540
540
}
541
541
542
542
type Querier interface {
543
+ // BlockID returns the block ID of the querier, when it is representing a single block.
544
+ BlockID () string
543
545
Bounds () (model.Time , model.Time )
546
+ Open (ctx context.Context ) error
547
+ Sort ([]Profile ) []Profile
544
548
545
- SelectMatchingProfiles (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (iter.Iterator [Profile ], error )
546
549
MergeByStacktraces (ctx context.Context , rows iter.Iterator [Profile ]) (* phlaremodel.Tree , error )
547
- SelectMergeByStacktraces (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (* phlaremodel.Tree , error )
548
550
MergeBySpans (ctx context.Context , rows iter.Iterator [Profile ], spans phlaremodel.SpanSelector ) (* phlaremodel.Tree , error )
549
551
MergeByLabels (ctx context.Context , rows iter.Iterator [Profile ], by ... string ) ([]* typesv1.Series , error )
550
- SelectMergeByLabels (ctx context.Context , params * ingestv1.SelectProfilesRequest , by ... string ) ([]* typesv1.Series , error )
551
552
MergePprof (ctx context.Context , rows iter.Iterator [Profile ], maxNodes int64 ) (* profilev1.Profile , error )
552
- SelectMergePprof (ctx context.Context , params * ingestv1.SelectProfilesRequest , maxNodes int64 ) (* profilev1.Profile , error )
553
553
Series (ctx context.Context , params * ingestv1.SeriesRequest ) ([]* typesv1.Labels , error )
554
+
555
+ SelectMatchingProfiles (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (iter.Iterator [Profile ], error )
556
+ SelectMergeByStacktraces (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (* phlaremodel.Tree , error )
557
+ SelectMergeByLabels (ctx context.Context , params * ingestv1.SelectProfilesRequest , by ... string ) ([]* typesv1.Series , error )
558
+ SelectMergeBySpans (ctx context.Context , params * ingestv1.SelectSpanProfileRequest ) (* phlaremodel.Tree , error )
559
+ SelectMergePprof (ctx context.Context , params * ingestv1.SelectProfilesRequest , maxNodes int64 ) (* profilev1.Profile , error )
560
+
554
561
ProfileTypes (context.Context , * connect.Request [ingestv1.ProfileTypesRequest ]) (* connect.Response [ingestv1.ProfileTypesResponse ], error )
555
562
LabelValues (ctx context.Context , req * connect.Request [typesv1.LabelValuesRequest ]) (* connect.Response [typesv1.LabelValuesResponse ], error )
556
563
LabelNames (ctx context.Context , req * connect.Request [typesv1.LabelNamesRequest ]) (* connect.Response [typesv1.LabelNamesResponse ], error )
557
- Open (ctx context.Context ) error
558
- // Sorts profiles for retrieval.
559
- Sort ([]Profile ) []Profile
560
-
561
- // BlockID returns the block ID of the querier, when it is representing a single block.
562
- BlockID () string
563
564
}
564
565
565
566
type TimeBounded interface {
@@ -921,58 +922,92 @@ func MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.M
921
922
otlog .String ("end" , model .Time (request .End ).Time ().String ()),
922
923
otlog .String ("selector" , request .LabelSelector ),
923
924
otlog .String ("profile_type_id" , request .Type .ID ),
925
+ otlog .Object ("hints" , request .Hints ),
924
926
)
925
927
926
928
spanSelector , err := phlaremodel .NewSpanSelector (request .SpanSelector )
927
929
if err != nil {
928
930
return err
929
931
}
930
932
931
- queriers , err := blockGetter (ctx , model .Time (request .Start ), model .Time (request .End ), nil )
932
- if err != nil {
933
- return err
934
- }
935
-
936
- iters , err := SelectMatchingProfiles (ctx , & ingestv1.SelectProfilesRequest {
937
- LabelSelector : request .LabelSelector ,
938
- Type : request .Type ,
939
- Start : request .Start ,
940
- End : request .End ,
941
- }, queriers )
933
+ queriers , err := blockGetter (ctx , model .Time (request .Start ), model .Time (request .End ), request .Hints )
942
934
if err != nil {
943
935
return err
944
936
}
945
937
946
- // send batches of profiles to client and filter via bidi stream.
947
- selectedProfiles , err := filterProfiles [
948
- BidiServerMerge [* ingestv1.MergeSpanProfileResponse , * ingestv1.MergeSpanProfileRequest ],
949
- * ingestv1.MergeSpanProfileResponse ,
950
- * ingestv1.MergeSpanProfileRequest ](ctx , iters , defaultBatchSize , stream )
951
- if err != nil {
952
- return err
938
+ deduplicationNeeded := true
939
+ if request .Hints != nil && request .Hints .Block != nil {
940
+ deduplicationNeeded = request .Hints .Block .Deduplication
953
941
}
954
942
955
943
var m sync.Mutex
956
944
t := new (phlaremodel.Tree )
957
945
g , ctx := errgroup .WithContext (ctx )
958
- for i , querier := range queriers {
959
- querier := querier
960
- i := i
961
- if len (selectedProfiles [i ]) == 0 {
962
- continue
946
+
947
+ // depending on if new need deduplication or not there are two different code paths.
948
+ if ! deduplicationNeeded {
949
+ // signal the end of the profile streaming by sending an empty response.
950
+ sp .LogFields (otlog .String ("msg" , "no profile streaming as no deduplication needed" ))
951
+ if err = stream .Send (& ingestv1.MergeSpanProfileResponse {}); err != nil {
952
+ return err
963
953
}
964
- // Sort profiles for better read locality.
965
- // Merge async the result so we can continue streaming profiles.
966
- g .Go (util .RecoverPanic (func () error {
967
- merge , err := querier .MergeBySpans (ctx , iter .NewSliceIterator (querier .Sort (selectedProfiles [i ])), spanSelector )
968
- if err != nil {
969
- return err
954
+
955
+ // in this path we can just merge the profiles from each block and send the result to the client.
956
+ for _ , querier := range queriers {
957
+ querier := querier
958
+ g .Go (util .RecoverPanic (func () error {
959
+ // TODO(simonswine): Split profiles per row group and run the MergeByStacktraces in parallel.
960
+ merge , err := querier .SelectMergeBySpans (ctx , request )
961
+ if err != nil {
962
+ return err
963
+ }
964
+
965
+ m .Lock ()
966
+ t .Merge (merge )
967
+ m .Unlock ()
968
+ return nil
969
+ }))
970
+ }
971
+ } else {
972
+ // in this path we have to go thorugh every profile and deduplicate them.
973
+ iters , err := SelectMatchingProfiles (ctx , & ingestv1.SelectProfilesRequest {
974
+ LabelSelector : request .LabelSelector ,
975
+ Type : request .Type ,
976
+ Start : request .Start ,
977
+ End : request .End ,
978
+ }, queriers )
979
+ if err != nil {
980
+ return err
981
+ }
982
+
983
+ // send batches of profiles to client and filter via bidi stream.
984
+ selectedProfiles , err := filterProfiles [
985
+ BidiServerMerge [* ingestv1.MergeSpanProfileResponse , * ingestv1.MergeSpanProfileRequest ],
986
+ * ingestv1.MergeSpanProfileResponse ,
987
+ * ingestv1.MergeSpanProfileRequest ](ctx , iters , defaultBatchSize , stream )
988
+ if err != nil {
989
+ return err
990
+ }
991
+
992
+ for i , querier := range queriers {
993
+ querier := querier
994
+ i := i
995
+ if len (selectedProfiles [i ]) == 0 {
996
+ continue
970
997
}
971
- m .Lock ()
972
- t .Merge (merge )
973
- m .Unlock ()
974
- return nil
975
- }))
998
+ // Sort profiles for better read locality.
999
+ // Merge async the result so we can continue streaming profiles.
1000
+ g .Go (util .RecoverPanic (func () error {
1001
+ merge , err := querier .MergeBySpans (ctx , iter .NewSliceIterator (querier .Sort (selectedProfiles [i ])), spanSelector )
1002
+ if err != nil {
1003
+ return err
1004
+ }
1005
+ m .Lock ()
1006
+ t .Merge (merge )
1007
+ m .Unlock ()
1008
+ return nil
1009
+ }))
1010
+ }
976
1011
}
977
1012
978
1013
// Signals the end of the profile streaming by sending an empty response.
@@ -992,6 +1027,12 @@ func MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.M
992
1027
}
993
1028
994
1029
// sends the final result to the client.
1030
+ treeBytes := buf .Bytes ()
1031
+ sp .LogFields (
1032
+ otlog .String ("msg" , "sending the final result to the client" ),
1033
+ otlog .Int ("tree_bytes" , len (treeBytes )),
1034
+ )
1035
+
995
1036
sp .LogFields (otlog .String ("msg" , "sending the final result to the client" ))
996
1037
err = stream .Send (& ingestv1.MergeSpanProfileResponse {
997
1038
Result : & ingestv1.MergeSpanProfileResult {
@@ -1720,6 +1761,89 @@ func (b *singleBlockQuerier) SelectMergeByStacktraces(ctx context.Context, param
1720
1761
return r .Tree ()
1721
1762
}
1722
1763
1764
+ func (b * singleBlockQuerier ) SelectMergeBySpans (ctx context.Context , params * ingestv1.SelectSpanProfileRequest ) (* phlaremodel.Tree , error ) {
1765
+ sp , ctx := opentracing .StartSpanFromContext (ctx , "SelectMergeBySpans - Block" )
1766
+ defer sp .Finish ()
1767
+ sp .SetTag ("block ULID" , b .meta .ULID .String ())
1768
+
1769
+ if err := b .Open (ctx ); err != nil {
1770
+ return nil , err
1771
+ }
1772
+ matchers , err := parser .ParseMetricSelector (params .LabelSelector )
1773
+ if err != nil {
1774
+ return nil , status .Error (codes .InvalidArgument , "failed to parse label selectors: " + err .Error ())
1775
+ }
1776
+ if params .Type == nil {
1777
+ return nil , errors .New ("no profileType given" )
1778
+ }
1779
+ spans , err := phlaremodel .NewSpanSelector (params .SpanSelector )
1780
+ if err != nil {
1781
+ return nil , err
1782
+ }
1783
+ matchers = append (matchers , phlaremodel .SelectorFromProfileType (params .Type ))
1784
+
1785
+ postings , err := PostingsForMatchers (b .index , nil , matchers ... )
1786
+ if err != nil {
1787
+ return nil , err
1788
+ }
1789
+
1790
+ var (
1791
+ chks = make ([]index.ChunkMeta , 1 )
1792
+ lblsPerRef = make (map [int64 ]struct {})
1793
+ )
1794
+
1795
+ // get all relevant labels/fingerprints
1796
+ for postings .Next () {
1797
+ _ , err := b .index .Series (postings .At (), nil , & chks )
1798
+ if err != nil {
1799
+ return nil , err
1800
+ }
1801
+ lblsPerRef [int64 (chks [0 ].SeriesIndex )] = struct {}{}
1802
+ }
1803
+ r := symdb .NewResolver (ctx , b .symbols )
1804
+ defer r .Release ()
1805
+
1806
+ it := query .NewBinaryJoinIterator (
1807
+ 0 ,
1808
+ b .profiles .columnIter (ctx , "SeriesIndex" , query .NewMapPredicate (lblsPerRef ), "" ),
1809
+ b .profiles .columnIter (ctx , "TimeNanos" , query .NewIntBetweenPredicate (model .Time (params .Start ).UnixNano (), model .Time (params .End ).UnixNano ()), "" ),
1810
+ )
1811
+
1812
+ if b .meta .Version >= 2 {
1813
+ it = query .NewBinaryJoinIterator (0 ,
1814
+ it ,
1815
+ b .profiles .columnIter (ctx , "StacktracePartition" , nil , "StacktracePartition" ),
1816
+ )
1817
+ }
1818
+ buf := make ([][]parquet.Value , 1 )
1819
+
1820
+ // todo: we should stream profile to merge instead of loading all in memory.
1821
+ // This is a temporary solution for now since there's a memory corruption happening.
1822
+ rows , err := iter.Slice [rowProfile ](
1823
+ & RowsIterator [rowProfile ]{
1824
+ rows : it ,
1825
+ at : func (ir * query.IteratorResult ) rowProfile {
1826
+ buf = ir .Columns (buf , "StacktracePartition" )
1827
+ if len (buf [0 ]) == 0 {
1828
+ return rowProfile {
1829
+ rowNum : ir .RowNumber [0 ],
1830
+ }
1831
+ }
1832
+ return rowProfile {
1833
+ rowNum : ir .RowNumber [0 ],
1834
+ partition : buf [0 ][0 ].Uint64 (),
1835
+ }
1836
+ },
1837
+ })
1838
+ if err != nil {
1839
+ return nil , err
1840
+ }
1841
+ if err := mergeBySpans [rowProfile ](ctx , b .profiles .file , iter .NewSliceIterator (rows ), r , spans ); err != nil {
1842
+ return nil , err
1843
+ }
1844
+ return r .Tree ()
1845
+ }
1846
+
1723
1847
func (b * singleBlockQuerier ) SelectMergePprof (ctx context.Context , params * ingestv1.SelectProfilesRequest , maxNodes int64 ) (* profilev1.Profile , error ) {
1724
1848
sp , ctx := opentracing .StartSpanFromContext (ctx , "SelectMergePprof - Block" )
1725
1849
defer sp .Finish ()
0 commit comments