|
6 | 6 | "strings"
|
7 | 7 | "time"
|
8 | 8 |
|
| 9 | + "github.com/gofrs/uuid" |
9 | 10 | "github.com/influxdata/influxdb-client-go/v2/api"
|
10 | 11 | "github.com/influxdata/influxdb-client-go/v2/log"
|
11 | 12 | "go.einride.tech/aip/filtering"
|
@@ -33,6 +34,7 @@ type InfluxDB interface {
|
33 | 34 | QueryPipelineTriggerRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtpb.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error)
|
34 | 35 | QueryPipelineTriggerTableRecords(ctx context.Context, owner string, ownerQueryString string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerTableRecord, totalSize int64, nextPageToken string, err error)
|
35 | 36 | QueryPipelineTriggerChartRecords(ctx context.Context, owner string, ownerQueryString string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtpb.PipelineTriggerChartRecord, err error)
|
| 37 | + ListModelTriggerChartRecords(ctx context.Context, p ListModelTriggerChartRecordsParams) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) |
36 | 38 |
|
37 | 39 | Bucket() string
|
38 | 40 | QueryAPI() api.QueryAPI
|
@@ -405,7 +407,7 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
|
405 | 407 | }
|
406 | 408 |
|
407 | 409 | if result.Record().Table() != currentTablePosition {
|
408 |
| - chartRecord = &mgmtpb.PipelineTriggerChartRecord{} |
| 410 | + chartRecord = &mgmtpb.PipelineTriggerChartRecord{} // only insert a new object when iterated to a new model |
409 | 411 |
|
410 | 412 | if v, match := result.Record().ValueByKey(constant.PipelineID).(string); match {
|
411 | 413 | chartRecord.PipelineId = v
|
@@ -447,6 +449,179 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
|
447 | 449 | return records, nil
|
448 | 450 | }
|
449 | 451 |
|
| 452 | +// todo: merge changes for new pipeline dashboard endpoints and refactor this part |
| 453 | +// const qModelTriggerChartRecords = ` |
| 454 | +// from(bucket: "%s") |
| 455 | +// |> range(start: %s, stop: %s) |
| 456 | +// |> filter(fn: (r) => r._measurement == "model.trigger.v1" and r.requester_uid == "%s") |
| 457 | +// |> filter(fn: (r) => r._field == "trigger_time") |
| 458 | +// |> group(columns:["requester_uid"]) |
| 459 | +// |> aggregateWindow(every: %s, column:"_value", fn: count, createEmpty: true, offset: %s) |
| 460 | +// ` |
| 461 | + |
| 462 | +// ListModelTriggerChartRecordsParams contains the required information to |
| 463 | +// query the model triggers of a namespace. |
| 464 | +type ListModelTriggerChartRecordsParams struct { |
| 465 | + NamespaceID string |
| 466 | + RequesterUID uuid.UUID |
| 467 | + AggregationWindow time.Duration |
| 468 | + Start time.Time |
| 469 | + Stop time.Time |
| 470 | +} |
| 471 | + |
| 472 | +func (i *influxDB) ListModelTriggerChartRecords( |
| 473 | + ctx context.Context, |
| 474 | + p ListModelTriggerChartRecordsParams, |
| 475 | +) (*mgmtpb.ListModelTriggerChartRecordsResponse, error) { |
| 476 | + // todo: merge changes for new pipeline dashboard endpoints and refactor this part |
| 477 | + // l, _ := logger.GetZapLogger(ctx) |
| 478 | + // l = l.With(zap.Reflect("triggerChartParams", p)) |
| 479 | + |
| 480 | + // query := fmt.Sprintf( |
| 481 | + // qModelTriggerChartRecords, |
| 482 | + // i.Bucket(), |
| 483 | + // p.Start.Format(time.RFC3339Nano), |
| 484 | + // p.Stop.Format(time.RFC3339Nano), |
| 485 | + // p.RequesterUID.String(), |
| 486 | + // p.AggregationWindow, |
| 487 | + // AggregationWindowOffset(p.Start).String(), |
| 488 | + // ) |
| 489 | + // result, err := i.QueryAPI().Query(ctx, query) |
| 490 | + // if err != nil { |
| 491 | + // return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err) |
| 492 | + // } |
| 493 | + // |
| 494 | + // defer result.Close() |
| 495 | + // |
| 496 | + // record := &mgmtpb.ModelTriggerChartRecord{ |
| 497 | + // RequesterId: p.NamespaceID, |
| 498 | + // TimeBuckets: []*timestamppb.Timestamp{}, |
| 499 | + // TriggerCounts: []int32{}, |
| 500 | + // } |
| 501 | + // |
| 502 | + // // Until filtering and grouping are implemented, we'll only have one record |
| 503 | + // // (total triggers by requester). |
| 504 | + // records := []*mgmtpb.ModelTriggerChartRecord{record} |
| 505 | + // |
| 506 | + // for result.Next() { |
| 507 | + // t := result.Record().Time() |
| 508 | + // record.TimeBuckets = append(record.TimeBuckets, timestamppb.New(t)) |
| 509 | + // |
| 510 | + // v, match := result.Record().Value().(int64) |
| 511 | + // if !match { |
| 512 | + // l.With(zap.Time("_time", result.Record().Time())). |
| 513 | + // Error("Missing count on model trigger chart record.") |
| 514 | + // } |
| 515 | + // |
| 516 | + // record.TriggerCounts = append(record.TriggerCounts, int32(v)) |
| 517 | + // } |
| 518 | + // |
| 519 | + // if result.Err() != nil { |
| 520 | + // return nil, fmt.Errorf("collecting information from model trigger chart records: %w", err) |
| 521 | + // } |
| 522 | + // |
| 523 | + // if result.Record() == nil { |
| 524 | + // return nil, nil |
| 525 | + // } |
| 526 | + // |
| 527 | + // return &mgmtpb.ListModelTriggerChartRecordsResponse{ |
| 528 | + // ModelTriggerChartRecords: records, |
| 529 | + // }, nil |
| 530 | + logger, _ := logger.GetZapLogger(ctx) |
| 531 | + |
| 532 | + query := fmt.Sprintf( |
| 533 | + `base = |
| 534 | + from(bucket: "%s") |
| 535 | + |> range(start: %v, stop: %v) |
| 536 | + |> filter(fn: (r) => r["_measurement"] == "model.trigger.v1" and r.requester_uid == "%s") |
| 537 | + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |
| 538 | + bucketBase = |
| 539 | + base |
| 540 | + |> group(columns: ["model_uid"]) |
| 541 | + |> sort(columns: ["trigger_time"]) |
| 542 | + bucketTrigger = |
| 543 | + bucketBase |
| 544 | + |> aggregateWindow( |
| 545 | + every: %v, |
| 546 | + column: "trigger_time", |
| 547 | + fn: count, |
| 548 | + createEmpty: false, |
| 549 | + ) |
| 550 | + bucketDuration = |
| 551 | + bucketBase |
| 552 | + |> aggregateWindow( |
| 553 | + every: %v, |
| 554 | + fn: sum, |
| 555 | + column: "compute_time_duration", |
| 556 | + createEmpty: false, |
| 557 | + ) |
| 558 | + bucket = |
| 559 | + join( |
| 560 | + tables: {t1: bucketTrigger, t2: bucketDuration}, |
| 561 | + on: ["_start", "_stop", "_time", "model_uid"], |
| 562 | + ) |
| 563 | + nameMap = |
| 564 | + base |
| 565 | + |> keep(columns: ["trigger_time", "model_id", "model_uid"]) |
| 566 | + |> group(columns: ["model_uid"]) |
| 567 | + |> top(columns: ["trigger_time"], n: 1) |
| 568 | + |> drop(columns: ["trigger_time"]) |
| 569 | + join(tables: {t1: bucket, t2: nameMap}, on: ["model_uid"])`, |
| 570 | + i.bucket, |
| 571 | + p.Start.Format(time.RFC3339Nano), |
| 572 | + p.Stop.Format(time.RFC3339Nano), |
| 573 | + p.RequesterUID.String(), |
| 574 | + p.AggregationWindow, |
| 575 | + p.AggregationWindow, |
| 576 | + ) |
| 577 | + |
| 578 | + result, err := i.api.Query(ctx, query) |
| 579 | + if err != nil { |
| 580 | + return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) |
| 581 | + } |
| 582 | + |
| 583 | + var currentTablePosition = -1 |
| 584 | + var chartRecord *mgmtpb.ModelTriggerChartRecord |
| 585 | + var records []*mgmtpb.ModelTriggerChartRecord |
| 586 | + // Iterate over query response |
| 587 | + for result.Next() { |
| 588 | + // Notice when group key has changed |
| 589 | + if result.TableChanged() { |
| 590 | + logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String())) |
| 591 | + } |
| 592 | + |
| 593 | + if result.Record().Table() != currentTablePosition { |
| 594 | + chartRecord = &mgmtpb.ModelTriggerChartRecord{} |
| 595 | + |
| 596 | + if v, match := result.Record().ValueByKey(constant.ModelID).(string); match { |
| 597 | + chartRecord.ModelId = &v |
| 598 | + } |
| 599 | + chartRecord.TimeBuckets = []*timestamppb.Timestamp{} |
| 600 | + chartRecord.TriggerCounts = []int32{} |
| 601 | + records = append(records, chartRecord) |
| 602 | + currentTablePosition = result.Record().Table() |
| 603 | + } |
| 604 | + |
| 605 | + if v, match := result.Record().ValueByKey("_time").(time.Time); match { |
| 606 | + chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(v)) |
| 607 | + } |
| 608 | + if v, match := result.Record().ValueByKey(constant.TriggerTime).(int64); match { |
| 609 | + chartRecord.TriggerCounts = append(chartRecord.TriggerCounts, int32(v)) |
| 610 | + } |
| 611 | + } |
| 612 | + // Check for an error |
| 613 | + if result.Err() != nil { |
| 614 | + return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) |
| 615 | + } |
| 616 | + if result.Record() == nil { |
| 617 | + return nil, nil |
| 618 | + } |
| 619 | + |
| 620 | + return &mgmtpb.ListModelTriggerChartRecordsResponse{ |
| 621 | + ModelTriggerChartRecords: records, |
| 622 | + }, nil |
| 623 | +} |
| 624 | + |
450 | 625 | // TranspileFilter transpiles a parsed AIP filter expression to Flux query expression
|
451 | 626 | func (i *influxDB) transpileFilter(filter filtering.Filter) (string, error) {
|
452 | 627 | return (&Transpiler{
|
|
0 commit comments