@@ -34,6 +34,7 @@ var defaultAggregationWindow = time.Hour.Nanoseconds()
34
34
type InfluxDB interface {
35
35
QueryPipelineTriggerRecords (ctx context.Context , owner string , ownerQueryString string , pageSize int64 , pageToken string , filter filtering.Filter ) (pipelines []* mgmtpb.PipelineTriggerRecord , totalSize int64 , nextPageToken string , err error )
36
36
QueryPipelineTriggerTableRecords (ctx context.Context , owner string , ownerQueryString string , pageSize int64 , pageToken string , filter filtering.Filter ) (records []* mgmtpb.PipelineTriggerTableRecord , totalSize int64 , nextPageToken string , err error )
37
+ QueryModelTriggerTableRecords (ctx context.Context , owner string , ownerQueryString string , pageSize int64 , pageToken string , filter filtering.Filter ) (records []* mgmtpb.ModelTriggerTableRecord , totalSize int64 , nextPageToken string , err error )
37
38
QueryPipelineTriggerChartRecords (ctx context.Context , owner string , ownerQueryString string , aggregationWindow int64 , filter filtering.Filter ) (records []* mgmtpb.PipelineTriggerChartRecord , err error )
38
39
ListModelTriggerChartRecords (ctx context.Context , p ListModelTriggerChartRecordsParams ) (* mgmtpb.ListModelTriggerChartRecordsResponse , error )
39
40
@@ -121,6 +122,192 @@ func AggregationWindowOffset(t time.Time) time.Duration {
121
122
return t .Sub (startOfDay )
122
123
}
123
124
125
+ func (i * influxDB ) QueryModelTriggerTableRecords (ctx context.Context , owner string , ownerQueryString string ,
126
+ pageSize int64 , pageToken string , filter filtering.Filter ) (records []* mgmtpb.ModelTriggerTableRecord ,
127
+ totalSize int64 , nextPageToken string , err error ) {
128
+
129
+ logger , _ := logger .GetZapLogger (ctx )
130
+
131
+ if pageSize == 0 {
132
+ pageSize = DefaultPageSize
133
+ } else if pageSize > MaxPageSize {
134
+ pageSize = MaxPageSize
135
+ }
136
+
137
+ start := time.Time {}.Format (time .RFC3339Nano )
138
+ stop := time .Now ().Format (time .RFC3339Nano )
139
+ mostRecetTimeFilter := time .Now ().Format (time .RFC3339Nano )
140
+
141
+ // TODO: validate owner uid from token
142
+ if pageToken != "" {
143
+ mostRecetTime , _ , err := paginate .DecodeToken (pageToken )
144
+ if err != nil {
145
+ return nil , 0 , "" , status .Errorf (codes .InvalidArgument , "Invalid page token: %s" , err .Error ())
146
+ }
147
+ mostRecetTime = mostRecetTime .Add (time .Duration (- 1 ))
148
+ mostRecetTimeFilter = mostRecetTime .Format (time .RFC3339Nano )
149
+ }
150
+
151
+ // TODO: design better filter expression to flux transpiler
152
+ expr , err := i .transpileFilter (filter )
153
+ if err != nil {
154
+ return nil , 0 , "" , status .Error (codes .Internal , err .Error ())
155
+ }
156
+
157
+ if expr != "" {
158
+ exprs := strings .Split (expr , "&&" )
159
+ for i , expr := range exprs {
160
+ if strings .HasPrefix (expr , constant .Start ) {
161
+ start = strings .Split (expr , "@" )[1 ]
162
+ exprs [i ] = ""
163
+ }
164
+ if strings .HasPrefix (expr , constant .Stop ) {
165
+ stop = strings .Split (expr , "@" )[1 ]
166
+ exprs [i ] = ""
167
+ }
168
+ }
169
+ expr = strings .Join (exprs , "" )
170
+ }
171
+
172
+ baseQuery := fmt .Sprintf (
173
+ `base =
174
+ from(bucket: "%v")
175
+ |> range(start: %v, stop: %v)
176
+ |> filter(fn: (r) => r["_measurement"] == "model.trigger.v1")
177
+ |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
178
+ %v
179
+ %v
180
+ triggerRank =
181
+ base
182
+ |> drop(
183
+ columns: [
184
+ "owner_uid",
185
+ "trigger_mode",
186
+ "compute_time_duration",
187
+ "model_trigger_id",
188
+ "status",
189
+ ],
190
+ )
191
+ |> group(columns: ["model_uid"])
192
+ |> map(fn: (r) => ({r with trigger_time: time(v: r.trigger_time)}))
193
+ |> sort(columns: ["trigger_time"], desc: true)
194
+ |> first(column: "trigger_time")
195
+ |> rename(columns: {trigger_time: "most_recent_trigger_time"})
196
+ triggerCount =
197
+ base
198
+ |> drop(
199
+ columns: ["owner_uid", "trigger_mode", "compute_time_duration", "model_trigger_id"],
200
+ )
201
+ |> group(columns: ["model_uid", "status"])
202
+ |> count(column: "trigger_time")
203
+ |> rename(columns: {trigger_time: "trigger_count"})
204
+ |> group(columns: ["model_uid"])
205
+ triggerTable =
206
+ join(tables: {t1: triggerRank, t2: triggerCount}, on: ["model_uid"])
207
+ |> group()
208
+ |> pivot(
209
+ rowKey: ["model_uid", "most_recent_trigger_time"],
210
+ columnKey: ["status"],
211
+ valueColumn: "trigger_count",
212
+ )
213
+ |> filter(
214
+ fn: (r) => r["most_recent_trigger_time"] < time(v: %v)
215
+ )
216
+ nameMap =
217
+ base
218
+ |> keep(columns: ["trigger_time", "model_id", "model_uid"])
219
+ |> group(columns: ["model_uid"])
220
+ |> top(columns: ["trigger_time"], n: 1)
221
+ |> drop(columns: ["trigger_time"])
222
+ |> group()
223
+ join(tables: {t1: triggerTable, t2: nameMap}, on: ["model_uid"])` ,
224
+ i .bucket ,
225
+ start ,
226
+ stop ,
227
+ ownerQueryString ,
228
+ expr ,
229
+ mostRecetTimeFilter ,
230
+ )
231
+
232
+ query := fmt .Sprintf (
233
+ `%v
234
+ |> group()
235
+ |> sort(columns: ["most_recent_trigger_time"], desc: true)
236
+ |> limit(n: %v)` ,
237
+ baseQuery ,
238
+ pageSize ,
239
+ )
240
+
241
+ totalQuery := fmt .Sprintf (
242
+ `%v
243
+ |> group()
244
+ |> count(column: "model_uid")` ,
245
+ baseQuery ,
246
+ )
247
+
248
+ var lastTimestamp time.Time
249
+
250
+ result , err := i .api .Query (ctx , query )
251
+ if err != nil {
252
+ return nil , 0 , "" , status .Errorf (codes .InvalidArgument , "Invalid query: %s" , err .Error ())
253
+ } else {
254
+ // Iterate over query response
255
+ for result .Next () {
256
+ // Notice when group key has changed
257
+ if result .TableChanged () {
258
+ logger .Debug (fmt .Sprintf ("table: %s\n " , result .TableMetadata ().String ()))
259
+ }
260
+
261
+ tableRecord := & mgmtpb.ModelTriggerTableRecord {}
262
+
263
+ if v , match := result .Record ().ValueByKey (constant .ModelID ).(string ); match {
264
+ tableRecord .ModelId = v
265
+ }
266
+ if v , match := result .Record ().ValueByKey (constant .ModelUID ).(string ); match {
267
+ tableRecord .ModelUid = v
268
+ }
269
+ if v , match := result .Record ().ValueByKey (mgmtpb .Status_STATUS_COMPLETED .String ()).(int64 ); match {
270
+ tableRecord .TriggerCountCompleted = int32 (v )
271
+ }
272
+ if v , match := result .Record ().ValueByKey (mgmtpb .Status_STATUS_ERRORED .String ()).(int64 ); match {
273
+ tableRecord .TriggerCountErrored = int32 (v )
274
+ }
275
+
276
+ records = append (records , tableRecord )
277
+ }
278
+
279
+ // Check for an error
280
+ if result .Err () != nil {
281
+ return nil , 0 , "" , status .Errorf (codes .InvalidArgument , "Invalid query: %s" , err .Error ())
282
+ }
283
+ if result .Record () == nil {
284
+ return nil , 0 , "" , nil
285
+ }
286
+
287
+ if v , match := result .Record ().ValueByKey ("most_recent_trigger_time" ).(time.Time ); match {
288
+ lastTimestamp = v
289
+ }
290
+ }
291
+
292
+ var total int64
293
+ totalQueryResult , err := i .api .Query (ctx , totalQuery )
294
+ if err != nil {
295
+ return nil , 0 , "" , status .Errorf (codes .InvalidArgument , "Invalid total query: %s" , err .Error ())
296
+ } else {
297
+ if totalQueryResult .Next () {
298
+ total = totalQueryResult .Record ().ValueByKey (constant .ModelUID ).(int64 )
299
+ }
300
+ }
301
+
302
+ if int64 (len (records )) < total {
303
+ pageToken = paginate .EncodeToken (lastTimestamp , owner )
304
+ } else {
305
+ pageToken = ""
306
+ }
307
+
308
+ return records , int64 (len (records )), pageToken , nil
309
+ }
310
+
124
311
func (i * influxDB ) QueryPipelineTriggerTableRecords (ctx context.Context , owner string , ownerQueryString string , pageSize int64 , pageToken string , filter filtering.Filter ) (records []* mgmtpb.PipelineTriggerTableRecord , totalSize int64 , nextPageToken string , err error ) {
125
312
126
313
logger , _ := logger .GetZapLogger (ctx )
0 commit comments