@@ -2,8 +2,6 @@ package combiner
2
2
3
3
import (
4
4
"net/http"
5
- "sort"
6
- "time"
7
5
8
6
"github.com/grafana/tempo/pkg/api"
9
7
"github.com/grafana/tempo/pkg/search"
@@ -40,8 +38,8 @@ func (s *SearchJobResponse) IsMetadata() bool {
40
38
var _ GRPCCombiner [* tempopb.SearchResponse ] = (* genericCombiner [* tempopb.SearchResponse ])(nil )
41
39
42
40
// NewSearch returns a search combiner
43
- func NewSearch (keepMostRecent int ) Combiner {
44
- metadataCombiner := traceql .NewMetadataCombiner (keepMostRecent )
41
+ func NewSearch (limit int , keepMostRecent bool ) Combiner {
42
+ metadataCombiner := traceql .NewMetadataCombiner (limit , keepMostRecent )
45
43
diffTraces := map [string ]struct {}{}
46
44
completedThroughTracker := & ShardCompletionTracker {}
47
45
@@ -95,18 +93,25 @@ func NewSearch(keepMostRecent int) Combiner {
95
93
Metrics : current .Metrics ,
96
94
}
97
95
98
- completedThroughSeconds := completedThroughTracker .completedThroughSeconds
99
- // if all jobs are completed then let's just return everything the combiner has
100
- if current .Metrics .CompletedJobs == current .Metrics .TotalJobs && current .Metrics .TotalJobs > 0 {
101
- completedThroughSeconds = 1
102
- }
103
-
104
- // if we've not completed any shards, then return nothing
105
- if completedThroughSeconds == 0 {
106
- return diff , nil
96
+ metadataFn := metadataCombiner .Metadata
97
+ if keepMostRecent {
98
+ metadataFn = func () []* tempopb.TraceSearchMetadata {
99
+ completedThroughSeconds := completedThroughTracker .completedThroughSeconds
100
+ // if all jobs are completed then let's just return everything the combiner has
101
+ if current .Metrics .CompletedJobs == current .Metrics .TotalJobs && current .Metrics .TotalJobs > 0 {
102
+ completedThroughSeconds = 1
103
+ }
104
+
105
+ // if we've not completed any shards, then return nothing
106
+ if completedThroughSeconds == 0 {
107
+ return nil
108
+ }
109
+
110
+ return metadataCombiner .MetadataAfter (completedThroughSeconds )
111
+ }
107
112
}
108
113
109
- for _ , tr := range metadataCombiner . MetadataAfter ( completedThroughSeconds ) {
114
+ for _ , tr := range metadataFn ( ) {
110
115
// if not in the map, skip. we haven't seen an update
111
116
if _ , ok := diffTraces [tr .TraceID ]; ! ok {
112
117
continue
@@ -116,35 +121,20 @@ func NewSearch(keepMostRecent int) Combiner {
116
121
diff .Traces = append (diff .Traces , tr )
117
122
}
118
123
119
- sort .Slice (diff .Traces , func (i , j int ) bool {
120
- return diff .Traces [i ].StartTimeUnixNano > diff .Traces [j ].StartTimeUnixNano
121
- })
122
-
123
124
addRootSpanNotReceivedText (diff .Traces )
124
125
125
126
return diff , nil
126
127
},
127
128
// search combiner doesn't use current in the way i would have expected. it only tracks metrics through current and uses the results map for the actual traces.
128
129
// should we change this?
129
130
quit : func (_ * tempopb.SearchResponse ) bool {
130
- // are we tracking a limit at all?
131
- if keepMostRecent <= 0 {
132
- return false
133
- }
134
-
135
131
completedThroughSeconds := completedThroughTracker .completedThroughSeconds
136
132
// have we completed any shards?
137
133
if completedThroughSeconds == 0 {
138
- return false
139
- }
140
-
141
- // do we have enought?
142
- if metadataCombiner .Count () < keepMostRecent {
143
- return false
134
+ completedThroughSeconds = traceql .TimestampNever
144
135
}
145
136
146
- // is our oldest trace newer than the completedThrough?
147
- return metadataCombiner .OldestTimestampNanos () > uint64 (completedThroughSeconds )* uint64 (time .Second )
137
+ return metadataCombiner .IsCompleteFor (completedThroughSeconds )
148
138
},
149
139
}
150
140
initHTTPCombiner (c , api .HeaderAcceptJSON )
@@ -159,8 +149,8 @@ func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) {
159
149
}
160
150
}
161
151
162
- func NewTypedSearch (limit int ) GRPCCombiner [* tempopb.SearchResponse ] {
163
- return NewSearch (limit ).(GRPCCombiner [* tempopb.SearchResponse ])
152
+ func NewTypedSearch (limit int , keepMostRecent bool ) GRPCCombiner [* tempopb.SearchResponse ] {
153
+ return NewSearch (limit , keepMostRecent ).(GRPCCombiner [* tempopb.SearchResponse ])
164
154
}
165
155
166
156
// ShardCompletionTracker
0 commit comments