Skip to content

Commit 29884c0

Browse files
committed
feat(metric): model trigger chart record api
1 parent c84e555 commit 29884c0

File tree

7 files changed

+164
-5
lines changed

7 files changed

+164
-5
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,5 @@ tmp
110110
bin
111111

112112
# local
113-
.DS_Store
113+
.DS_Store
114+
.idea

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ require (
1616
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
1717
github.com/iancoleman/strcase v0.2.0
1818
github.com/influxdata/influxdb-client-go/v2 v2.12.3
19-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e
19+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029090626-ebae4235071d
2020
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
2121
github.com/instill-ai/x v0.4.0-alpha
2222
github.com/knadh/koanf v1.5.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,8 +1090,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
10901090
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
10911091
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
10921092
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
1093-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e h1:qUCFv38Xl9Gn9MMAYoHR97i7iQ/L3b8zvj/TX/viZlU=
1094-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241023112228-d36cbd2f1d9e/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
1093+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029090626-ebae4235071d h1:jF7vN9LdL4MAvuu9xmXy7UR6ZKi+RBLqgzrkGLa83ts=
1094+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241029090626-ebae4235071d/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
10951095
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
10961096
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
10971097
github.com/instill-ai/x v0.4.0-alpha h1:zQV2VLbSHjMv6gyBN/2mwwrvWk0/mJM6ZKS12AzjfQg=

pkg/handler/publichandler.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ import (
2323
"github.com/instill-ai/mgmt-backend/pkg/logger"
2424
"github.com/instill-ai/mgmt-backend/pkg/service"
2525
"github.com/instill-ai/mgmt-backend/pkg/usage"
26+
"github.com/instill-ai/x/checkfield"
2627

2728
custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel"
2829
healthcheckPB "github.com/instill-ai/protogen-go/common/healthcheck/v1beta"
2930
mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
30-
checkfield "github.com/instill-ai/x/checkfield"
3131
)
3232

3333
// TODO: Validate mask based on the field behavior. Currently, the fields are hard-coded.
@@ -1049,6 +1049,40 @@ func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req
10491049
return &resp, nil
10501050
}
10511051

1052+
// ListModelTriggerChartRecords returns a timeline of model trigger counts for a given requester. The
1053+
// response will contain one set of records (datapoints), representing the amount of triggers in a time bucket.
1054+
func (h *PublicHandler) ListModelTriggerChartRecords(ctx context.Context, req *mgmtPB.ListModelTriggerChartRecordsRequest) (*mgmtPB.ListModelTriggerChartRecordsResponse, error) {
1055+
1056+
eventName := "ListModelTriggerChartRecords"
1057+
ctx, span := tracer.Start(ctx, eventName,
1058+
trace.WithSpanKind(trace.SpanKindServer))
1059+
defer span.End()
1060+
1061+
logUUID, _ := uuid.NewV4()
1062+
logger, _ := logger.GetZapLogger(ctx)
1063+
1064+
ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false)
1065+
if err != nil {
1066+
span.SetStatus(1, err.Error())
1067+
return nil, err
1068+
}
1069+
1070+
resp, err := h.Service.ListModelTriggerChartRecords(ctx, req, ctxUserUID)
1071+
if err != nil {
1072+
span.SetStatus(1, err.Error())
1073+
return nil, fmt.Errorf("fetching credit chart records: %w", err)
1074+
}
1075+
1076+
logger.Info(string(custom_otel.NewLogMessage(
1077+
span,
1078+
logUUID.String(),
1079+
ctxUserUID,
1080+
eventName,
1081+
)))
1082+
1083+
return resp, nil
1084+
}
1085+
10521086
func (h *PublicHandler) ListUserMemberships(ctx context.Context, req *mgmtPB.ListUserMembershipsRequest) (*mgmtPB.ListUserMembershipsResponse, error) {
10531087

10541088
eventName := "ListUserMemberships"

pkg/repository/influx.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strings"
77
"time"
88

9+
"github.com/gofrs/uuid"
910
"github.com/influxdata/influxdb-client-go/v2/api"
1011
"github.com/influxdata/influxdb-client-go/v2/log"
1112
"go.einride.tech/aip/filtering"
@@ -22,6 +23,7 @@ import (
2223
"github.com/instill-ai/mgmt-backend/pkg/logger"
2324
"github.com/instill-ai/x/paginate"
2425

26+
errdomain "github.com/instill-ai/mgmt-backend/pkg/errors"
2527
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
2628
)
2729

@@ -33,6 +35,7 @@ type InfluxDB interface {
3335
QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error)
3436
QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error)
3537
QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error)
38+
ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)
3639

3740
Bucket() string
3841
QueryAPI() api.QueryAPI
@@ -447,6 +450,84 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
447450
return records, nil
448451
}
449452

453+
const qModelTriggerChartRecords = `
454+
from(bucket: "%s")
455+
|> range(start: %s, stop: %s)
456+
|> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s")
457+
|> filter(fn: (r) => r._field == "trigger_time")
458+
|> group(columns:["requester_uid"])
459+
|> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s)
460+
`
461+
462+
// ListModelTriggerChartRecordsParams contains the required information to
463+
// query the model triggers of a namespace.
464+
type ListModelTriggerChartRecordsParams struct {
465+
NamespaceID string
466+
RequesterUID uuid.UUID
467+
AggregationWindow time.Duration
468+
Start time.Time
469+
Stop time.Time
470+
}
471+
472+
func (i *influxDB) ListModelTriggerChartRecords(
473+
ctx context.Context,
474+
p ListModelTriggerChartRecordsParams,
475+
) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) {
476+
l, _ := logger.GetZapLogger(ctx)
477+
l = l.With(zap.Reflect("triggerChartParams", p))
478+
479+
query := fmt.Sprintf(
480+
qModelTriggerChartRecords,
481+
i.Bucket(),
482+
p.Start.Format(time.RFC3339Nano),
483+
p.Stop.Format(time.RFC3339Nano),
484+
p.RequesterUID.String(),
485+
p.AggregationWindow,
486+
AggregationWindowOffset(p.Start).String(),
487+
)
488+
result, err := i.QueryAPI().Query(ctx, query)
489+
if err != nil {
490+
return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err)
491+
}
492+
493+
defer result.Close()
494+
495+
record := &mgmtpb.ModelTriggerChartRecord{
496+
RequesterId: p.NamespaceID,
497+
TimeBuckets: []*timestamppb.Timestamp{},
498+
TriggerCounts: []int32{},
499+
}
500+
501+
// Until filtering and grouping are implemented, we'll only have one record
502+
// (total triggers by requester).
503+
records := []*mgmtpb.ModelTriggerChartRecord{record}
504+
505+
for result.Next() {
506+
t := result.Record().Time()
507+
record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t))
508+
509+
v, match := result.Record().Value().(int64)
510+
if !match {
511+
l.With(zap.Time("_time", result.Record().Time())).
512+
Error("Missing count on model trigger chart record.")
513+
}
514+
515+
record.TriggerCounts = append(record.TriggerCounts, int32(v))
516+
}
517+
518+
if result.Err() != nil {
519+
return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err)
520+
}
521+
522+
if result.Record() == nil {
523+
return nil, nil
524+
}
525+
526+
return &mgmtpb.ListModelTriggerChartRecordsResponse{
527+
ModelTriggerChartRecords: records,
528+
}, nil
529+
}
530+
450531
// TranspileFilter transpiles a parsed AIP filter expression to Flux query expression
451532
func (i *influxDB) transpileFilter(filter filtering.Filter) (string, error) {
452533
return (&Transpiler{

pkg/service/metric.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"strings"
8+
"time"
89

910
"go.einride.tech/aip/filtering"
1011
"gorm.io/gorm"
@@ -163,6 +164,47 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg
163164
return pipelineTriggerChartRecords, nil
164165
}
165166

167+
func (s *service) ListModelTriggerChartRecords(
168+
ctx context.Context,
169+
req *mgmtpb.ListModelTriggerChartRecordsRequest,
170+
ctxUserUID uuid.UUID,
171+
) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) {
172+
nsUID, err := s.GrantedNamespaceUID(ctx, req.GetRequesterId(), ctxUserUID)
173+
if err != nil {
174+
return nil, fmt.Errorf("checking user permissions: %w", err)
175+
}
176+
177+
now := time.Now().UTC()
178+
p := repository.ListModelTriggerChartRecordsParams{
179+
NamespaceID: req.GetRequesterId(),
180+
RequesterUID: nsUID,
181+
182+
// Default values
183+
AggregationWindow: 1 * time.Hour,
184+
Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
185+
Stop: now,
186+
}
187+
188+
if req.GetAggregationWindow() != "" {
189+
window, err := time.ParseDuration(req.GetAggregationWindow())
190+
if err != nil {
191+
return nil, fmt.Errorf("%w: extracting duration from aggregation window: %w", errdomain.ErrInvalidArgument, err)
192+
}
193+
194+
p.AggregationWindow = window
195+
}
196+
197+
if req.GetStart() != nil {
198+
p.Start = req.GetStart().AsTime()
199+
}
200+
201+
if req.GetStop() != nil {
202+
p.Stop = req.GetStop().AsTime()
203+
}
204+
205+
return s.influxDB.ListModelTriggerChartRecords(ctx, p)
206+
}
207+
166208
// GrantedNamespaceUID returns the UID of a namespace, provided the
167209
// authenticated user has access to it.
168210
func (s *service) GrantedNamespaceUID(ctx context.Context, namespaceID string, authenticatedUserUID uuid.UUID) (uuid.UUID, error) {

pkg/service/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ type Service interface {
7373
ListPipelineTriggerRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerRecord, int64, string, error)
7474
ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtpb.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerTableRecord, int64, string, error)
7575
ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtpb.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtpb.PipelineTriggerChartRecord, error)
76+
ListModelTriggerChartRecords(ctx context.Context, req *mgmtpb.ListModelTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtpb.ListModelTriggerChartRecordsResponse, error)
7677

7778
DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtpb.User, error)
7879
DBUsers2PBUsers(ctx context.Context, dbUsers []*datamodel.Owner) ([]*mgmtpb.User, error)

0 commit comments

Comments
 (0)