Skip to content

Commit 4eb9fb3

Browse files
committed
fix(metric): apply influxdb query with new format
1 parent b2b5aa5 commit 4eb9fb3

File tree

4 files changed

+41
-136
lines changed

4 files changed

+41
-136
lines changed

pkg/constant/constant.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ const (
4141
PipelineUID string = "pipeline_uid"
4242
PipelineReleaseID string = "pipeline_release_id"
4343
PipelineReleaseUID string = "pipeline_release_uid"
44-
ModelID string = "model_id"
4544
TriggerMode string = "trigger_mode"
4645
Status string = "status"
4746
)

pkg/handler/publichandler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1070,7 +1070,7 @@ func (h *PublicHandler) ListModelTriggerChartRecords(ctx context.Context, req *m
10701070
resp, err := h.Service.ListModelTriggerChartRecords(ctx, req, ctxUserUID)
10711071
if err != nil {
10721072
span.SetStatus(1, err.Error())
1073-
return nil, fmt.Errorf("fetching credit chart records: %w", err)
1073+
return nil, err
10741074
}
10751075

10761076
logger.Info(string(custom_otel.NewLogMessage(

pkg/repository/influx.go

Lines changed: 39 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/instill-ai/mgmt-backend/pkg/logger"
2424
"github.com/instill-ai/x/paginate"
2525

26+
errdomain "github.com/instill-ai/mgmt-backend/pkg/errors"
2627
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
2728
)
2829

@@ -449,20 +450,19 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
449450
return records, nil
450451
}
451452

452-
// todo: merge changes for new pipeline dashboard endpoints and refactor this part
453-
// const qModelTriggerChartRecords = `
454-
// from(bucket: "%s")
455-
// |> range(start: %s, stop: %s)
456-
// |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s")
457-
// |> filter(fn: (r) => r._field == "trigger_time")
458-
// |> group(columns:["requester_uid"])
459-
// |> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s)
460-
// `
453+
const qModelTriggerChartRecords = `
454+
from(bucket: "%s")
455+
|> range(start: %s, stop: %s)
456+
|> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s")
457+
|> filter(fn: (r) => r._field == "trigger_time")
458+
|> group(columns:["requester_uid"])
459+
|> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s)
460+
`
461461

462462
// ListModelTriggerChartRecordsParams contains the required information to
463463
// query the model triggers of a namespace.
464464
type ListModelTriggerChartRecordsParams struct {
465-
NamespaceID string
465+
RequesterID string
466466
RequesterUID uuid.UUID
467467
AggregationWindow time.Duration
468468
Start time.Time
@@ -473,146 +473,52 @@ func (i *influxDB) ListModelTriggerChartRecords(
473473
ctx context.Context,
474474
p ListModelTriggerChartRecordsParams,
475475
) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) {
476-
// todo: merge changes for new pipeline dashboard endpoints and refactor this part
477-
// l, _ := logger.GetZapLogger(ctx)
478-
// l = l.With(zap.Reflect("triggerChartParams", p))
479-
480-
// query := fmt.Sprintf(
481-
// qModelTriggerChartRecords,
482-
// i.Bucket(),
483-
// p.Start.Format(time.RFC3339Nano),
484-
// p.Stop.Format(time.RFC3339Nano),
485-
// p.RequesterUID.String(),
486-
// p.AggregationWindow,
487-
// AggregationWindowOffset(p.Start).String(),
488-
// )
489-
// result, err := i.QueryAPI().Query(ctx, query)
490-
// if err != nil {
491-
// return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err)
492-
// }
493-
//
494-
// defer result.Close()
495-
//
496-
// record := &mgmtpb.ModelTriggerChartRecord{
497-
// RequesterId: p.NamespaceID,
498-
// TimeBuckets: []*timestamppb.Timestamp{},
499-
// TriggerCounts: []int32{},
500-
// }
501-
//
502-
// // Until filtering and grouping are implemented, we'll only have one record
503-
// // (total triggers by requester).
504-
// records := []*mgmtpb.ModelTriggerChartRecord{record}
505-
//
506-
// for result.Next() {
507-
// t := result.Record().Time()
508-
// record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t))
509-
//
510-
// v, match := result.Record().Value().(int64)
511-
// if !match {
512-
// l.With(zap.Time("_time", result.Record().Time())).
513-
// Error("Missing count on model trigger chart record.")
514-
// }
515-
//
516-
// record.TriggerCounts = append(record.TriggerCounts, int32(v))
517-
// }
518-
//
519-
// if result.Err() != nil {
520-
// return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err)
521-
// }
522-
//
523-
// if result.Record() == nil {
524-
// return nil, nil
525-
// }
526-
//
527-
// return &mgmtpb.ListModelTriggerChartRecordsResponse{
528-
// ModelTriggerChartRecords: records,
529-
// }, nil
530-
logger, _ := logger.GetZapLogger(ctx)
476+
l, _ := logger.GetZapLogger(ctx)
477+
l = l.With(zap.Reflect("triggerChartParams", p))
531478

532479
query := fmt.Sprintf(
533-
`base =
534-
from(bucket: "%s")
535-
|> range(start: %v, stop: %v)
536-
|> filter(fn: (r) => r["_measurement"] == "model.trigger.v1" and r.requester_uid == "%s")
537-
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
538-
bucketBase =
539-
base
540-
|> group(columns: ["model_uid"])
541-
|> sort(columns: ["trigger_time"])
542-
bucketTrigger =
543-
bucketBase
544-
|> aggregateWindow(
545-
every: %v,
546-
column: "trigger_time",
547-
fn: count,
548-
createEmpty: false,
549-
)
550-
bucketDuration =
551-
bucketBase
552-
|> aggregateWindow(
553-
every: %v,
554-
fn: sum,
555-
column: "compute_time_duration",
556-
createEmpty: false,
557-
)
558-
bucket =
559-
join(
560-
tables: {t1: bucketTrigger, t2: bucketDuration},
561-
on: ["_start", "_stop", "_time", "model_uid"],
562-
)
563-
nameMap =
564-
base
565-
|> keep(columns: ["trigger_time", "model_id", "model_uid"])
566-
|> group(columns: ["model_uid"])
567-
|> top(columns: ["trigger_time"], n: 1)
568-
|> drop(columns: ["trigger_time"])
569-
join(tables: {t1: bucket, t2: nameMap}, on: ["model_uid"])`,
570-
i.bucket,
480+
qModelTriggerChartRecords,
481+
i.Bucket(),
571482
p.Start.Format(time.RFC3339Nano),
572483
p.Stop.Format(time.RFC3339Nano),
573484
p.RequesterUID.String(),
574485
p.AggregationWindow,
575-
p.AggregationWindow,
486+
AggregationWindowOffset(p.Start).String(),
576487
)
577-
578-
result, err := i.api.Query(ctx, query)
488+
result, err := i.QueryAPI().Query(ctx, query)
579489
if err != nil {
580-
return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
490+
return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err)
581491
}
582492

583-
var currentTablePosition = -1
584-
var chartRecord *mgmtpb.ModelTriggerChartRecord
585-
var records []*mgmtpb.ModelTriggerChartRecord
586-
// Iterate over query response
587-
for result.Next() {
588-
// Notice when group key has changed
589-
if result.TableChanged() {
590-
logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String()))
591-
}
493+
defer result.Close()
592494

593-
if result.Record().Table() != currentTablePosition {
594-
chartRecord = &mgmtpb.ModelTriggerChartRecord{}
495+
record := &mgmtpb.ModelTriggerChartRecord{
496+
RequesterId: p.RequesterID,
497+
TimeBuckets: []*timestamppb.Timestamp{},
498+
TriggerCounts: []int32{},
499+
}
595500

596-
if v, match := result.Record().ValueByKey(constant.ModelID).(string); match {
597-
chartRecord.ModelId = &v
598-
}
599-
chartRecord.TimeBuckets = []*timestamppb.Timestamp{}
600-
chartRecord.TriggerCounts = []int32{}
601-
records = append(records, chartRecord)
602-
currentTablePosition = result.Record().Table()
603-
}
501+
// Until filtering and grouping are implemented, we'll only have one record
502+
// (total triggers by requester).
503+
records := []*mgmtpb.ModelTriggerChartRecord{record}
604504

605-
if v, match := result.Record().ValueByKey("_time").(time.Time); match {
606-
chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(v))
607-
}
608-
if v, match := result.Record().ValueByKey(constant.TriggerTime).(int64); match {
609-
chartRecord.TriggerCounts = append(chartRecord.TriggerCounts, int32(v))
505+
for result.Next() {
506+
t := result.Record().Time()
507+
record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t))
508+
509+
v, match := result.Record().Value().(int64)
510+
if !match {
511+
l.With(zap.Time("_time", result.Record().Time())).
512+
Error("Missing count on model trigger chart record.")
610513
}
514+
515+
record.TriggerCounts = append(record.TriggerCounts, int32(v))
611516
}
612-
// Check for an error
517+
613518
if result.Err() != nil {
614-
return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
519+
return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err)
615520
}
521+
616522
if result.Record() == nil {
617523
return nil, nil
618524
}

pkg/service/metric.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (s *service) ListModelTriggerChartRecords(
176176

177177
now := time.Now().UTC()
178178
p := repository.ListModelTriggerChartRecordsParams{
179-
NamespaceID: req.GetRequesterId(),
179+
RequesterID: req.GetRequesterId(),
180180
RequesterUID: nsUID,
181181

182182
// Default values

0 commit comments

Comments
 (0)