Skip to content

Commit

Permalink
feat(metric): model trigger chart record api
Browse files Browse the repository at this point in the history
  • Loading branch information
joremysh committed Oct 29, 2024
1 parent c84e555 commit 7fa3523
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 2 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ tmp
bin

# local
.DS_Store
.DS_Store
.idea
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.22.5

retract v0.3.2 // Published accidentally.

replace github.com/instill-ai/protogen-go => ./../protogen-go

require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/frankban/quicktest v1.14.6
Expand Down
38 changes: 37 additions & 1 deletion pkg/handler/publichandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/instill-ai/mgmt-backend/pkg/logger"
"github.com/instill-ai/mgmt-backend/pkg/service"
"github.com/instill-ai/mgmt-backend/pkg/usage"
"github.com/instill-ai/x/checkfield"

custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel"
healthcheckPB "github.com/instill-ai/protogen-go/common/healthcheck/v1beta"
mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
checkfield "github.com/instill-ai/x/checkfield"
)

// TODO: Validate mask based on the field behavior. Currently, the fields are hard-coded.
Expand Down Expand Up @@ -1049,6 +1049,42 @@ func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req
return &resp, nil
}

// ListModelTriggerChartRecords returns a timeline of model trigger counts for a given requester. The
// response will contain one set of records (datapoints), representing the amount of triggers in a time bucket.
func (h *PublicHandler) ListModelTriggerChartRecords(ctx context.Context, req *mgmtPB.ListModelTriggerChartRecordsRequest) (*mgmtPB.ListModelTriggerChartRecordsResponse, error) {

eventName := "ListModelTriggerChartRecords"
ctx, span := tracer.Start(ctx, eventName,
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

logUUID, _ := uuid.NewV4()
logger, _ := logger.GetZapLogger(ctx)

ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}
//
// resp, err := h.Service.ListPipelineTriggerChartRecords(ctx, req, ctxUserUID)
// if err != nil {
// span.SetStatus(1, err.Error())
// return nil, fmt.Errorf("fetching credit chart records: %w", err)
// }
//
logger.Info(string(custom_otel.NewLogMessage(
span,
logUUID.String(),
ctxUserUID,
eventName,
)))
//
// return resp, nil
// }
return nil, nil
}

func (h *PublicHandler) ListUserMemberships(ctx context.Context, req *mgmtPB.ListUserMembershipsRequest) (*mgmtPB.ListUserMembershipsResponse, error) {

eventName := "ListUserMemberships"
Expand Down
81 changes: 81 additions & 0 deletions pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/gofrs/uuid"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/log"
"go.einride.tech/aip/filtering"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/instill-ai/mgmt-backend/pkg/logger"
"github.com/instill-ai/x/paginate"

errdomain "github.com/instill-ai/mgmt-backend/pkg/errors"
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
)

Expand All @@ -33,6 +35,7 @@ type InfluxDB interface {
QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error)
QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error)
ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)

Bucket() string
QueryAPI() api.QueryAPI
Expand Down Expand Up @@ -447,6 +450,84 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
return records, nil
}

const qModelTriggerChartRecords = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s")
|> filter(fn: (r) => r._field == "trigger_time")
|> group(columns:["requester_uid"])
|> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s)
`

// ListModelTriggerChartRecordsParams contains the required information to
// query the model triggers of a namespace.
type ListModelTriggerChartRecordsParams struct {
NamespaceID string
RequesterUID uuid.UUID
AggregationWindow time.Duration
Start time.Time
Stop time.Time
}

func (i *influxDB) ListModelTriggerChartRecords(
ctx context.Context,
p ListModelTriggerChartRecordsParams,
) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) {
l, _ := logger.GetZapLogger(ctx)
l = l.With(zap.Reflect("triggerChartParams", p))

query := fmt.Sprintf(
qModelTriggerChartRecords,
i.Bucket(),
p.Start.Format(time.RFC3339Nano),
p.Stop.Format(time.RFC3339Nano),
p.RequesterUID.String(),
p.AggregationWindow,
AggregationWindowOffset(p.Start).String(),
)
result, err := i.QueryAPI().Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err)
}

defer result.Close()

record := &mgmtpb.ModelTriggerChartRecord{
RequesterId: p.NamespaceID,
TimeBuckets: []*timestamppb.Timestamp{},
TriggerCounts: []int32{},
}

// Until filtering and grouping are implemented, we'll only have one record
// (total triggers by requester).
records := []*mgmtpb.ModelTriggerChartRecord{record}

for result.Next() {
t := result.Record().Time()
record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t))

v, match := result.Record().Value().(int64)
if !match {
l.With(zap.Time("_time", result.Record().Time())).
Error("Missing count on model trigger chart record.")
}

record.TriggerCounts = append(record.TriggerCounts, int32(v))
}

if result.Err() != nil {
return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err)
}

if result.Record() == nil {
return nil, nil
}

return &mgmtpb.ListModelTriggerChartRecordsResponse{
ModelTriggerChartRecords: records,
}, nil
}

// TranspileFilter transpiles a parsed AIP filter expression to Flux query expression
func (i *influxDB) transpileFilter(filter filtering.Filter) (string, error) {
return (&Transpiler{
Expand Down
42 changes: 42 additions & 0 deletions pkg/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"go.einride.tech/aip/filtering"
"gorm.io/gorm"
Expand Down Expand Up @@ -163,6 +164,47 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg
return pipelineTriggerChartRecords, nil
}

func (s *service) ListModelTriggerChartRecords(
ctx context.Context,
req *mgmtpb.ListModelTriggerChartRecordsRequest,
ctxUserUID uuid.UUID,
) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) {
nsUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID)
if err != nil {
return nil, fmt.Errorf("checking user permissions: %w", err)
}

now := time.Now().UTC()
p := repository.ListModelTriggerChartRecordsParams{
NamespaceID: req.GetRequesterId(),
RequesterUID: nsUID,

// Default values
AggregationWindow: 1 * time.Hour,
Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
Stop: now,
}

if req.GetAggregationWindow() != "" {
window, err := time.ParseDuration(req.GetAggregationWindow())
if err != nil {
return nil, fmt.Errorf("%w: extracting duration from aggregation window: %w", errdomain.ErrInvalidArgument, err)
}

p.AggregationWindow = window
}

if req.GetStart() != nil {
p.Start = req.GetStart().AsTime()
}

if req.GetStop() != nil {
p.Stop = req.GetStop().AsTime()
}

return s.influxDB.ListModelTriggerChartRecords(ctx, p)
}

// GrantedNamespaceUID returns the UID of a namespace, provided the
// authenticated user has access to it.
func (s *service) GrantedNamespaceUID(ctx context.Context, namespaceID string, authenticatedUserUID uuid.UUID) (uuid.UUID, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Service interface {
ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error)
ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error)
ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error)
ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error)

DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)
DBUsers2PBUsers(ctx context.Context, dbUsers []*datamodel.Owner) ([]*mgmtpb.User, error)
Expand Down

0 comments on commit 7fa3523

Please sign in to comment.