From 7fa35230b19b9d06a5c8b7b439e6f45525c6098a Mon Sep 17 00:00:00 2001 From: Jeremy Shih Date: Mon, 28 Oct 2024 16:01:21 +0800 Subject: [PATCH] feat(metric): model trigger chart record api --- .gitignore | 3 +- go.mod | 2 + pkg/handler/publichandler.go | 38 ++++++++++++++++- pkg/repository/influx.go | 81 ++++++++++++++++++++++++++++++++++++ pkg/service/metric.go | 42 +++++++++++++++++++ pkg/service/service.go | 1 + 6 files changed, 165 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index c4aae290..86c93ff8 100644 --- a/.gitignore +++ b/.gitignore @@ -110,4 +110,5 @@ tmp bin # local -.DS_Store \ No newline at end of file +.DS_Store +.idea diff --git a/go.mod b/go.mod index 0e08564f..b274c118 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.22.5 retract v0.3.2 // Published accidentally. +replace github.com/instill-ai/protogen-go => ./../protogen-go + require ( github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/frankban/quicktest v1.14.6 diff --git a/pkg/handler/publichandler.go b/pkg/handler/publichandler.go index bbfdf14c..5499d7ab 100644 --- a/pkg/handler/publichandler.go +++ b/pkg/handler/publichandler.go @@ -23,11 +23,11 @@ import ( "github.com/instill-ai/mgmt-backend/pkg/logger" "github.com/instill-ai/mgmt-backend/pkg/service" "github.com/instill-ai/mgmt-backend/pkg/usage" + "github.com/instill-ai/x/checkfield" custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel" healthcheckPB "github.com/instill-ai/protogen-go/common/healthcheck/v1beta" mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta" - checkfield "github.com/instill-ai/x/checkfield" ) // TODO: Validate mask based on the field behavior. Currently, the fields are hard-coded. @@ -1049,6 +1049,42 @@ func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req return &resp, nil } +// ListModelTriggerChartRecords returns a timeline of model trigger counts for a given requester. The +// response will contain one set of records (datapoints), representing the amount of triggers in a time bucket. +func (h *PublicHandler) ListModelTriggerChartRecords(ctx context.Context, req *mgmtPB.ListModelTriggerChartRecordsRequest) (*mgmtPB.ListModelTriggerChartRecordsResponse, error) { + + eventName := "ListModelTriggerChartRecords" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + logger, _ := logger.GetZapLogger(ctx) + + ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + // + // resp, err := h.Service.ListPipelineTriggerChartRecords(ctx, req, ctxUserUID) + // if err != nil { + // span.SetStatus(1, err.Error()) + // return nil, fmt.Errorf("fetching credit chart records: %w", err) + // } + // + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + ctxUserUID, + eventName, + ))) + // + // return resp, nil + // } + return nil, nil +} + func (h *PublicHandler) ListUserMemberships(ctx context.Context, req *mgmtPB.ListUserMembershipsRequest) (*mgmtPB.ListUserMembershipsResponse, error) { eventName := "ListUserMemberships" diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index 73816d05..ffe216e4 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/gofrs/uuid" "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/log" "go.einride.tech/aip/filtering" @@ -22,6 +23,7 @@ import ( "github.com/instill-ai/mgmt-backend/pkg/logger" "github.com/instill-ai/x/paginate" + errdomain "github.com/instill-ai/mgmt-backend/pkg/errors" mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" ) @@ -33,6 +35,7 @@ type InfluxDB interface { QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error) QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error) + ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) Bucket() string QueryAPI() api.QueryAPI @@ -447,6 +450,84 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s return records, nil } +const qModelTriggerChartRecords = ` +from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s") + |> filter(fn: (r) => r._field == "trigger_time") + |> group(columns:["requester_uid"]) + |> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s) +` + +// ListModelTriggerChartRecordsParams contains the required information to +// query the model triggers of a namespace. +type ListModelTriggerChartRecordsParams struct { + NamespaceID string + RequesterUID uuid.UUID + AggregationWindow time.Duration + Start time.Time + Stop time.Time +} + +func (i *influxDB) ListModelTriggerChartRecords( + ctx context.Context, + p ListModelTriggerChartRecordsParams, +) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) { + l, _ := logger.GetZapLogger(ctx) + l = l.With(zap.Reflect("triggerChartParams", p)) + + query := fmt.Sprintf( + qModelTriggerChartRecords, + i.Bucket(), + p.Start.Format(time.RFC3339Nano), + p.Stop.Format(time.RFC3339Nano), + p.RequesterUID.String(), + p.AggregationWindow, + AggregationWindowOffset(p.Start).String(), + ) + result, err := i.QueryAPI().Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err) + } + + defer result.Close() + + record := &mgmtpb.ModelTriggerChartRecord{ + RequesterId: p.NamespaceID, + TimeBuckets: []*timestamppb.Timestamp{}, + TriggerCounts: []int32{}, + } + + // Until filtering and grouping are implemented, we'll only have one record + // (total triggers by requester). + records := []*mgmtpb.ModelTriggerChartRecord{record} + + for result.Next() { + t := result.Record().Time() + record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t)) + + v, match := result.Record().Value().(int64) + if !match { + l.With(zap.Time("_time", result.Record().Time())). + Error("Missing count on model trigger chart record.") + } + + record.TriggerCounts = append(record.TriggerCounts, int32(v)) + } + + if result.Err() != nil { + return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err) + } + + if result.Record() == nil { + return nil, nil + } + + return &mgmtpb.ListModelTriggerChartRecordsResponse{ + ModelTriggerChartRecords: records, + }, nil +} + // TranspileFilter transpiles a parsed AIP filter expression to Flux query expression func (i *influxDB) transpileFilter(filter filtering.Filter) (string, error) { return (&Transpiler{ diff --git a/pkg/service/metric.go b/pkg/service/metric.go index f3b2986c..55be387b 100644 --- a/pkg/service/metric.go +++ b/pkg/service/metric.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "time" "go.einride.tech/aip/filtering" "gorm.io/gorm" @@ -163,6 +164,47 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg return pipelineTriggerChartRecords, nil } +func (s *service) ListModelTriggerChartRecords( + ctx context.Context, + req *mgmtpb.ListModelTriggerChartRecordsRequest, + ctxUserUID uuid.UUID, +) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) { + nsUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID) + if err != nil { + return nil, fmt.Errorf("checking user permissions: %w", err) + } + + now := time.Now().UTC() + p := repository.ListModelTriggerChartRecordsParams{ + NamespaceID: req.GetRequesterId(), + RequesterUID: nsUID, + + // Default values + AggregationWindow: 1 * time.Hour, + Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()), + Stop: now, + } + + if req.GetAggregationWindow() != "" { + window, err := time.ParseDuration(req.GetAggregationWindow()) + if err != nil { + return nil, fmt.Errorf("%w: extracting duration from aggregation window: %w", errdomain.ErrInvalidArgument, err) + } + + p.AggregationWindow = window + } + + if req.GetStart() != nil { + p.Start = req.GetStart().AsTime() + } + + if req.GetStop() != nil { + p.Stop = req.GetStop().AsTime() + } + + return s.influxDB.ListModelTriggerChartRecords(ctx, p) +} + // GrantedNamespaceUID returns the UID of a namespace, provided the // authenticated user has access to it. func (s *service) GrantedNamespaceUID(ctx context.Context, namespaceID string, authenticatedUserUID uuid.UUID) (uuid.UUID, error) { diff --git a/pkg/service/service.go b/pkg/service/service.go index da3c893f..1b05c176 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -73,6 +73,7 @@ type Service interface { ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error) ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error) + ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error) DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error) DBUsers2PBUsers(ctx context.Context, dbUsers []*datamodel.Owner) ([]*mgmtpb.User, error)