diff --git a/docs/components/AWS.mdx b/docs/components/AWS.mdx index 2204b13964..2ee160b1b1 100644 --- a/docs/components/AWS.mdx +++ b/docs/components/AWS.mdx @@ -18,6 +18,7 @@ import { CardGrid, LinkCard } from "@astrojs/starlight/components"; ## Actions + @@ -262,6 +263,57 @@ Each image scan event includes: } ``` + + +## CloudWatch • Query Metrics Insights + +The Query Metrics Insights component runs a CloudWatch Metrics Insights query using the GetMetricData API. + +### Use Cases + +- **Observability automation**: Query current metric trends during workflows +- **SLO checks**: Evaluate key service metrics before progressing a deployment +- **Incident response**: Pull grouped metric views to enrich notifications + +### Configuration + +- **Region**: AWS region where the metrics are stored +- **Metrics Insights Query**: SQL-like query in CloudWatch Metrics Insights syntax +- **Lookback Window (minutes)**: Relative time window ending at execution time +- **Max Datapoints**: Maximum datapoints returned by CloudWatch +- **Result Order**: Timestamp ascending or descending order + +### Example Output + +```json +{ + "endTime": "2026-02-12T10:15:00Z", + "maxDatapoints": 1000, + "messages": [], + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId) GROUP BY InstanceId", + "region": "us-east-1", + "requestId": "0f0a2432-ef2b-4ff2-a89f-021f6d77fbde", + "results": [ + { + "id": "q1", + "label": "CPUUtilization", + "messages": [], + "statusCode": "Complete", + "timestamps": [ + "2026-02-12T10:10:00Z", + "2026-02-12T10:05:00Z" + ], + "values": [ + 14.2, + 11.8 + ] + } + ], + "scanBy": "TimestampDescending", + "startTime": "2026-02-12T10:00:00Z" +} +``` + ## CodeArtifact • Copy Package Versions diff --git a/pkg/integrations/aws/aws.go b/pkg/integrations/aws/aws.go index 135915f9a5..4bd7511be4 100644 --- a/pkg/integrations/aws/aws.go +++ b/pkg/integrations/aws/aws.go @@ -141,6 +141,7 @@ func (a *AWS) Components() []core.Component { &ecr.GetImage{}, &ecr.GetImageScanFindings{}, &ecr.ScanImage{}, + &cloudwatch.QueryMetricsInsights{}, &lambda.RunFunction{}, } } diff --git a/pkg/integrations/aws/cloudwatch/client.go b/pkg/integrations/aws/cloudwatch/client.go new file mode 100644 index 0000000000..e3737ffcb3 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/client.go @@ -0,0 +1,277 @@ +package cloudwatch + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/integrations/aws/common" +) + +const ( + serviceName = "monitoring" + apiVersion = "2010-08-01" + contentType = "application/x-www-form-urlencoded; charset=utf-8" + metricsQueryID = "q1" + defaultMetricsInsightsPeriodSeconds = 60 +) + +type Client struct { + http core.HTTPContext + region string + credentials *aws.Credentials + signer *v4.Signer +} + +type QueryMetricsInsightsInput struct { + Query string + StartTime time.Time + EndTime time.Time + ScanBy string + MaxDatapoints int +} + +type QueryMetricsInsightsOutput struct { + RequestID string `json:"requestId"` + Results []MetricDataResult `json:"results"` + Messages []MetricDataMessage `json:"messages,omitempty"` +} + +type MetricDataResult struct { + ID string `json:"id" xml:"Id"` + Label string `json:"label" xml:"Label"` + StatusCode string `json:"statusCode" xml:"StatusCode"` + Timestamps []string `json:"timestamps" xml:"Timestamps>member"` + Values []float64 `json:"values" xml:"Values>member"` + Messages []MetricDataMessage `json:"messages,omitempty" xml:"Messages>member"` +} + +type MetricDataMessage struct { + Code string `json:"code" xml:"Code"` + Value string `json:"value" xml:"Value"` +} + +type getMetricDataResponse struct { + MetricDataResults []MetricDataResult `xml:"GetMetricDataResult>MetricDataResults>member"` + Messages []MetricDataMessage `xml:"GetMetricDataResult>Messages>member"` + NextToken string `xml:"GetMetricDataResult>NextToken"` + RequestID string `xml:"ResponseMetadata>RequestId"` +} + +func NewClient(httpCtx core.HTTPContext, credentials *aws.Credentials, region string) *Client { + return &Client{ + http: httpCtx, + region: strings.TrimSpace(region), + credentials: credentials, + signer: v4.NewSigner(), + } +} + +func (c *Client) QueryMetricsInsights(input QueryMetricsInsightsInput) (*QueryMetricsInsightsOutput, error) { + query := strings.TrimSpace(input.Query) + if query == "" { + return nil, fmt.Errorf("query is required") + } + + startTime := input.StartTime.UTC() + if startTime.IsZero() { + return nil, fmt.Errorf("start time is required") + } + + endTime := input.EndTime.UTC() + if endTime.IsZero() { + return nil, fmt.Errorf("end time is required") + } + + if !endTime.After(startTime) { + return nil, fmt.Errorf("end time must be after start time") + } + + if input.MaxDatapoints < 0 { + return nil, fmt.Errorf("max datapoints must be greater than or equal to zero") + } + + scanBy := strings.TrimSpace(input.ScanBy) + if scanBy == "" { + scanBy = ScanByTimestampDescending + } + + if !isValidScanBy(scanBy) { + return nil, fmt.Errorf("invalid scan by value: %s", scanBy) + } + + output := &QueryMetricsInsightsOutput{} + nextToken := "" + + for { + values := c.getMetricDataValues(query, startTime, endTime, scanBy, input.MaxDatapoints, nextToken) + response := getMetricDataResponse{} + if err := c.postForm(values, &response); err != nil { + return nil, err + } + + output.RequestID = strings.TrimSpace(response.RequestID) + output.Results = mergeMetricDataResults(output.Results, response.MetricDataResults) + output.Messages = append(output.Messages, response.Messages...) + + nextToken = strings.TrimSpace(response.NextToken) + if nextToken == "" { + return output, nil + } + } +} + +func (c *Client) getMetricDataValues( + query string, + startTime time.Time, + endTime time.Time, + scanBy string, + maxDatapoints int, + nextToken string, +) url.Values { + values := url.Values{} + values.Set("Action", "GetMetricData") + values.Set("Version", apiVersion) + values.Set("StartTime", startTime.Format(time.RFC3339)) + values.Set("EndTime", endTime.Format(time.RFC3339)) + values.Set("ScanBy", scanBy) + + if maxDatapoints > 0 { + values.Set("MaxDatapoints", strconv.Itoa(maxDatapoints)) + } + + values.Set("MetricDataQueries.member.1.Id", metricsQueryID) + values.Set("MetricDataQueries.member.1.Expression", query) + values.Set("MetricDataQueries.member.1.ReturnData", "true") + values.Set("MetricDataQueries.member.1.Period", strconv.Itoa(defaultMetricsInsightsPeriodSeconds)) + + if strings.TrimSpace(nextToken) != "" { + values.Set("NextToken", strings.TrimSpace(nextToken)) + } + + return values +} + +func (c *Client) postForm(values url.Values, out any) error { + body := values.Encode() + + req, err := http.NewRequest(http.MethodPost, c.endpoint(), strings.NewReader(body)) + if err != nil { + return fmt.Errorf("failed to build request: %w", err) + } + + req.Header.Set("Content-Type", contentType) + + if err := c.signRequest(req, []byte(body)); err != nil { + return err + } + + res, err := c.http.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer res.Body.Close() + + responseBody, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response: %w", err) + } + + if res.StatusCode < 200 || res.StatusCode >= 300 { + if awsErr := parseError(responseBody); awsErr != nil { + return awsErr + } + return fmt.Errorf("CloudWatch API request failed with %d: %s", res.StatusCode, string(responseBody)) + } + + if out == nil { + return nil + } + + if err := xml.Unmarshal(responseBody, out); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + + return nil +} + +func (c *Client) signRequest(req *http.Request, payload []byte) error { + hash := sha256.Sum256(payload) + payloadHash := hex.EncodeToString(hash[:]) + return c.signer.SignHTTP(context.Background(), *c.credentials, req, payloadHash, serviceName, c.region, time.Now()) +} + +func (c *Client) endpoint() string { + return fmt.Sprintf("https://monitoring.%s.amazonaws.com/", c.region) +} + +func mergeMetricDataResults(existing []MetricDataResult, incoming []MetricDataResult) []MetricDataResult { + if len(incoming) == 0 { + return existing + } + + if len(existing) == 0 { + return incoming + } + + indexByID := map[string]int{} + for i, result := range existing { + indexByID[result.ID] = i + } + + for _, result := range incoming { + index, ok := indexByID[result.ID] + if !ok { + indexByID[result.ID] = len(existing) + existing = append(existing, result) + continue + } + + existing[index].Timestamps = append(existing[index].Timestamps, result.Timestamps...) + existing[index].Values = append(existing[index].Values, result.Values...) + existing[index].Messages = append(existing[index].Messages, result.Messages...) + if strings.TrimSpace(result.Label) != "" { + existing[index].Label = result.Label + } + if strings.TrimSpace(result.StatusCode) != "" { + existing[index].StatusCode = result.StatusCode + } + } + + return existing +} + +func parseError(body []byte) *common.Error { + var payload struct { + Error struct { + Code string `xml:"Code"` + Message string `xml:"Message"` + } `xml:"Error"` + } + + if err := xml.Unmarshal(body, &payload); err != nil { + return nil + } + + code := strings.TrimSpace(payload.Error.Code) + message := strings.TrimSpace(payload.Error.Message) + if code == "" && message == "" { + return nil + } + + return &common.Error{ + Code: code, + Message: message, + } +} diff --git a/pkg/integrations/aws/cloudwatch/example.go b/pkg/integrations/aws/cloudwatch/example.go index 5e5ad28fa0..b0c6d33c67 100644 --- a/pkg/integrations/aws/cloudwatch/example.go +++ b/pkg/integrations/aws/cloudwatch/example.go @@ -10,9 +10,23 @@ import ( //go:embed example_data_on_alarm.json var exampleDataOnAlarmBytes []byte +//go:embed example_output_query_metrics_insights.json +var exampleOutputQueryMetricsInsightsBytes []byte + var exampleDataOnAlarmOnce sync.Once var exampleDataOnAlarm map[string]any +var exampleOutputQueryMetricsInsightsOnce sync.Once +var exampleOutputQueryMetricsInsights map[string]any + func (t *OnAlarm) ExampleData() map[string]any { return utils.UnmarshalEmbeddedJSON(&exampleDataOnAlarmOnce, exampleDataOnAlarmBytes, &exampleDataOnAlarm) } + +func (c *QueryMetricsInsights) ExampleOutput() map[string]any { + return utils.UnmarshalEmbeddedJSON( + &exampleOutputQueryMetricsInsightsOnce, + exampleOutputQueryMetricsInsightsBytes, + &exampleOutputQueryMetricsInsights, + ) +} diff --git a/pkg/integrations/aws/cloudwatch/example_output_query_metrics_insights.json b/pkg/integrations/aws/cloudwatch/example_output_query_metrics_insights.json new file mode 100644 index 0000000000..dd21dd3920 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/example_output_query_metrics_insights.json @@ -0,0 +1,26 @@ +{ + "endTime": "2026-02-12T10:15:00Z", + "maxDatapoints": 1000, + "messages": [], + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId) GROUP BY InstanceId", + "region": "us-east-1", + "requestId": "0f0a2432-ef2b-4ff2-a89f-021f6d77fbde", + "results": [ + { + "id": "q1", + "label": "CPUUtilization", + "messages": [], + "statusCode": "Complete", + "timestamps": [ + "2026-02-12T10:10:00Z", + "2026-02-12T10:05:00Z" + ], + "values": [ + 14.2, + 11.8 + ] + } + ], + "scanBy": "TimestampDescending", + "startTime": "2026-02-12T10:00:00Z" +} diff --git a/pkg/integrations/aws/cloudwatch/query_metrics_insights.go b/pkg/integrations/aws/cloudwatch/query_metrics_insights.go new file mode 100644 index 0000000000..65b7e1bd29 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/query_metrics_insights.go @@ -0,0 +1,288 @@ +package cloudwatch + +import ( + "fmt" + "net/http" + "strings" + "time" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/integrations/aws/common" +) + +const ( + defaultLookbackMinutes = 15 + defaultMaxDatapoints = 1000 + maxAllowedCloudWatchDatapoints = 100800 + ScanByTimestampDescending = "TimestampDescending" + ScanByTimestampAscending = "TimestampAscending" + QueryMetricsInsightsEventType = "aws.cloudwatch.metricsInsights.query" +) + +var AllScanByOptions = []configuration.FieldOption{ + { + Label: "Timestamp Descending", + Value: ScanByTimestampDescending, + }, + { + Label: "Timestamp Ascending", + Value: ScanByTimestampAscending, + }, +} + +type QueryMetricsInsights struct{} + +type QueryMetricsInsightsConfiguration struct { + Region string `json:"region" mapstructure:"region"` + Query string `json:"query" mapstructure:"query"` + LookbackMinutes int `json:"lookbackMinutes" mapstructure:"lookbackMinutes"` + MaxDatapoints int `json:"maxDatapoints" mapstructure:"maxDatapoints"` + ScanBy string `json:"scanBy" mapstructure:"scanBy"` +} + +func (c *QueryMetricsInsights) Name() string { + return "aws.cloudwatch.queryMetricsInsights" +} + +func (c *QueryMetricsInsights) Label() string { + return "CloudWatch • Query Metrics Insights" +} + +func (c *QueryMetricsInsights) Description() string { + return "Run a CloudWatch Metrics Insights query against AWS metrics" +} + +func (c *QueryMetricsInsights) Documentation() string { + return `The Query Metrics Insights component runs a CloudWatch Metrics Insights query using the GetMetricData API. + +## Use Cases + +- **Observability automation**: Query current metric trends during workflows +- **SLO checks**: Evaluate key service metrics before progressing a deployment +- **Incident response**: Pull grouped metric views to enrich notifications + +## Configuration + +- **Region**: AWS region where the metrics are stored +- **Metrics Insights Query**: SQL-like query in CloudWatch Metrics Insights syntax +- **Lookback Window (minutes)**: Relative time window ending at execution time +- **Max Datapoints**: Maximum datapoints returned by CloudWatch +- **Result Order**: Timestamp ascending or descending order + +## Notes + +- The component automatically sets the query window to now minus lookback through now +- It emits one payload containing query metadata and all metric result series` +} + +func (c *QueryMetricsInsights) Icon() string { + return "aws" +} + +func (c *QueryMetricsInsights) Color() string { + return "gray" +} + +func (c *QueryMetricsInsights) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{core.DefaultOutputChannel} +} + +func (c *QueryMetricsInsights) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "region", + Label: "Region", + Type: configuration.FieldTypeSelect, + Required: true, + Default: "us-east-1", + TypeOptions: &configuration.TypeOptions{ + Select: &configuration.SelectTypeOptions{ + Options: common.AllRegions, + }, + }, + }, + { + Name: "query", + Label: "Metrics Insights Query", + Type: configuration.FieldTypeText, + Required: true, + Description: "CloudWatch Metrics Insights query string (for example: SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId) GROUP BY InstanceId)", + Placeholder: "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId) GROUP BY InstanceId", + }, + { + Name: "lookbackMinutes", + Label: "Lookback Window (minutes)", + Type: configuration.FieldTypeNumber, + Required: false, + Default: defaultLookbackMinutes, + Description: "How many minutes back from now to query", + TypeOptions: &configuration.TypeOptions{ + Number: &configuration.NumberTypeOptions{ + Min: func() *int { min := 1; return &min }(), + Max: func() *int { max := 1440; return &max }(), + }, + }, + }, + { + Name: "maxDatapoints", + Label: "Max Datapoints", + Type: configuration.FieldTypeNumber, + Required: false, + Default: defaultMaxDatapoints, + Description: "Maximum datapoints CloudWatch should return", + TypeOptions: &configuration.TypeOptions{ + Number: &configuration.NumberTypeOptions{ + Min: func() *int { min := 1; return &min }(), + Max: func() *int { max := maxAllowedCloudWatchDatapoints; return &max }(), + }, + }, + }, + { + Name: "scanBy", + Label: "Result Order", + Type: configuration.FieldTypeSelect, + Required: false, + Default: ScanByTimestampDescending, + TypeOptions: &configuration.TypeOptions{ + Select: &configuration.SelectTypeOptions{ + Options: AllScanByOptions, + }, + }, + }, + } +} + +func (c *QueryMetricsInsights) Setup(ctx core.SetupContext) error { + _, err := c.parseConfiguration(ctx.Configuration) + return err +} + +func (c *QueryMetricsInsights) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (c *QueryMetricsInsights) Execute(ctx core.ExecutionContext) error { + config, err := c.parseConfiguration(ctx.Configuration) + if err != nil { + return err + } + + credentials, err := common.CredentialsFromInstallation(ctx.Integration) + if err != nil { + return fmt.Errorf("failed to get AWS credentials: %w", err) + } + + endTime := time.Now().UTC() + startTime := endTime.Add(-time.Duration(config.LookbackMinutes) * time.Minute) + + client := NewClient(ctx.HTTP, credentials, config.Region) + response, err := client.QueryMetricsInsights(QueryMetricsInsightsInput{ + Query: config.Query, + StartTime: startTime, + EndTime: endTime, + ScanBy: config.ScanBy, + MaxDatapoints: config.MaxDatapoints, + }) + if err != nil { + return fmt.Errorf("failed to execute metrics insights query: %w", err) + } + + return ctx.ExecutionState.Emit( + core.DefaultOutputChannel.Name, + QueryMetricsInsightsEventType, + []any{ + map[string]any{ + "region": config.Region, + "query": config.Query, + "startTime": startTime.Format(time.RFC3339), + "endTime": endTime.Format(time.RFC3339), + "scanBy": config.ScanBy, + "maxDatapoints": config.MaxDatapoints, + "requestId": response.RequestID, + "results": response.Results, + "messages": response.Messages, + }, + }, + ) +} + +func (c *QueryMetricsInsights) Actions() []core.Action { + return []core.Action{} +} + +func (c *QueryMetricsInsights) HandleAction(ctx core.ActionContext) error { + return nil +} + +func (c *QueryMetricsInsights) HandleWebhook(ctx core.WebhookRequestContext) (int, error) { + return http.StatusOK, nil +} + +func (c *QueryMetricsInsights) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (c *QueryMetricsInsights) Cleanup(ctx core.SetupContext) error { + return nil +} + +func (c *QueryMetricsInsights) parseConfiguration(rawConfiguration any) (QueryMetricsInsightsConfiguration, error) { + config := QueryMetricsInsightsConfiguration{} + if err := mapstructure.Decode(rawConfiguration, &config); err != nil { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("failed to decode configuration: %w", err) + } + + config.Region = strings.TrimSpace(config.Region) + config.Query = strings.TrimSpace(config.Query) + config.ScanBy = strings.TrimSpace(config.ScanBy) + + if config.LookbackMinutes < 0 { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("lookback minutes must be greater than or equal to zero") + } + if config.MaxDatapoints < 0 { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("max datapoints must be greater than or equal to zero") + } + + if config.LookbackMinutes == 0 { + config.LookbackMinutes = defaultLookbackMinutes + } + if config.MaxDatapoints == 0 { + config.MaxDatapoints = defaultMaxDatapoints + } + if config.ScanBy == "" { + config.ScanBy = ScanByTimestampDescending + } + + if config.Region == "" { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("region is required") + } + if config.Query == "" { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("metrics insights query is required") + } + if config.LookbackMinutes < 1 { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("lookback minutes must be greater than zero") + } + if config.MaxDatapoints < 1 { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("max datapoints must be greater than zero") + } + if config.MaxDatapoints > maxAllowedCloudWatchDatapoints { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("max datapoints must be less than or equal to %d", maxAllowedCloudWatchDatapoints) + } + if !isValidScanBy(config.ScanBy) { + return QueryMetricsInsightsConfiguration{}, fmt.Errorf("invalid scan by value: %s", config.ScanBy) + } + + return config, nil +} + +func isValidScanBy(scanBy string) bool { + switch scanBy { + case ScanByTimestampAscending, ScanByTimestampDescending: + return true + default: + return false + } +} diff --git a/pkg/integrations/aws/cloudwatch/query_metrics_insights_test.go b/pkg/integrations/aws/cloudwatch/query_metrics_insights_test.go new file mode 100644 index 0000000000..453c2e3cb5 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/query_metrics_insights_test.go @@ -0,0 +1,264 @@ +package cloudwatch + +import ( + "io" + "net/http" + "net/url" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func Test__QueryMetricsInsights__Setup(t *testing.T) { + component := &QueryMetricsInsights{} + + t.Run("invalid configuration -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: "invalid", + }) + + require.ErrorContains(t, err, "failed to decode configuration") + }) + + t.Run("missing region -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId)", + }, + }) + + require.ErrorContains(t, err, "region is required") + }) + + t.Run("missing query -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + }, + }) + + require.ErrorContains(t, err, "metrics insights query is required") + }) + + t.Run("invalid scanBy -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId)", + "scanBy": "descending", + }, + }) + + require.ErrorContains(t, err, "invalid scan by value") + }) + + t.Run("negative lookback -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId)", + "lookbackMinutes": -5, + }, + }) + + require.ErrorContains(t, err, "lookback minutes must be greater than or equal to zero") + }) + + t.Run("max datapoints above allowed limit -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId)", + "maxDatapoints": maxAllowedCloudWatchDatapoints + 1, + }, + }) + + require.ErrorContains(t, err, "max datapoints must be less than or equal to") + }) + + t.Run("valid configuration -> ok", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId)", + "lookbackMinutes": 10, + "maxDatapoints": 500, + "scanBy": ScanByTimestampDescending, + }, + }) + + require.NoError(t, err) + }) +} + +func Test__QueryMetricsInsights__Execute(t *testing.T) { + component := &QueryMetricsInsights{} + + t.Run("missing credentials -> error", func(t *testing.T) { + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId)", + }, + ExecutionState: &contexts.ExecutionStateContext{KVs: map[string]string{}}, + Integration: &contexts.IntegrationContext{Secrets: map[string]core.IntegrationSecret{}}, + }) + + require.ErrorContains(t, err, "AWS session credentials are missing") + }) + + t.Run("paginated response -> emits merged metric results", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(getMetricDataResponsePageOne())), + }, + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(getMetricDataResponsePageTwo())), + }, + }, + } + + executionState := &contexts.ExecutionStateContext{KVs: map[string]string{}} + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "query": "SELECT AVG(CPUUtilization) FROM SCHEMA(\"AWS/EC2\", InstanceId) GROUP BY InstanceId", + "lookbackMinutes": 20, + "maxDatapoints": 250, + "scanBy": ScanByTimestampDescending, + }, + HTTP: httpContext, + ExecutionState: executionState, + Integration: &contexts.IntegrationContext{ + Secrets: awsSessionSecrets(), + }, + }) + + require.NoError(t, err) + assert.Equal(t, QueryMetricsInsightsEventType, executionState.Type) + require.Len(t, executionState.Payloads, 1) + + payload := executionState.Payloads[0].(map[string]any)["data"].(map[string]any) + assert.Equal(t, "us-east-1", payload["region"]) + assert.Equal(t, 250, payload["maxDatapoints"]) + assert.Equal(t, ScanByTimestampDescending, payload["scanBy"]) + assert.Equal(t, "request-2", payload["requestId"]) + + startTimeValue, ok := payload["startTime"].(string) + require.True(t, ok) + endTimeValue, ok := payload["endTime"].(string) + require.True(t, ok) + startTime, err := time.Parse(time.RFC3339, startTimeValue) + require.NoError(t, err) + endTime, err := time.Parse(time.RFC3339, endTimeValue) + require.NoError(t, err) + assert.True(t, endTime.After(startTime)) + + results, ok := payload["results"].([]MetricDataResult) + require.True(t, ok) + require.Len(t, results, 1) + assert.Equal(t, "q1", results[0].ID) + assert.Equal(t, "CPUUtilization", results[0].Label) + assert.Equal(t, "Complete", results[0].StatusCode) + assert.Equal(t, []string{"2026-02-12T10:10:00Z", "2026-02-12T10:05:00Z"}, results[0].Timestamps) + assert.Equal(t, []float64{10.5, 11.5}, results[0].Values) + + messages, ok := payload["messages"].([]MetricDataMessage) + require.True(t, ok) + require.Len(t, messages, 1) + assert.Equal(t, "Info", messages[0].Code) + + require.Len(t, httpContext.Requests, 2) + assert.Equal(t, "https://monitoring.us-east-1.amazonaws.com/", httpContext.Requests[0].URL.String()) + assert.Equal(t, "https://monitoring.us-east-1.amazonaws.com/", httpContext.Requests[1].URL.String()) + + firstRequestBody, err := io.ReadAll(httpContext.Requests[0].Body) + require.NoError(t, err) + firstValues, err := url.ParseQuery(string(firstRequestBody)) + require.NoError(t, err) + assert.Equal(t, "GetMetricData", firstValues.Get("Action")) + assert.Equal(t, "2010-08-01", firstValues.Get("Version")) + assert.Equal(t, "q1", firstValues.Get("MetricDataQueries.member.1.Id")) + assert.Equal(t, "60", firstValues.Get("MetricDataQueries.member.1.Period")) + assert.Equal(t, "", firstValues.Get("NextToken")) + + secondRequestBody, err := io.ReadAll(httpContext.Requests[1].Body) + require.NoError(t, err) + secondValues, err := url.ParseQuery(string(secondRequestBody)) + require.NoError(t, err) + assert.Equal(t, "token-1", secondValues.Get("NextToken")) + }) +} + +func awsSessionSecrets() map[string]core.IntegrationSecret { + return map[string]core.IntegrationSecret{ + "accessKeyId": {Name: "accessKeyId", Value: []byte("key")}, + "secretAccessKey": {Name: "secretAccessKey", Value: []byte("secret")}, + "sessionToken": {Name: "sessionToken", Value: []byte("token")}, + } +} + +func getMetricDataResponsePageOne() string { + return ` + + + + + q1 + + PartialData + + 2026-02-12T10:10:00Z + + + 10.5 + + + + token-1 + + + request-1 + + +` +} + +func getMetricDataResponsePageTwo() string { + return ` + + + + + q1 + + Complete + + 2026-02-12T10:05:00Z + + + 11.5 + + + + + + Info + Query completed successfully + + + + + request-2 + + +` +} diff --git a/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/query_metrics_insights.ts b/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/query_metrics_insights.ts new file mode 100644 index 0000000000..69f9b1db68 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/query_metrics_insights.ts @@ -0,0 +1,134 @@ +import { + ComponentBaseContext, + ComponentBaseMapper, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + SubtitleContext, +} from "../../types"; +import { ComponentBaseProps, ComponentBaseSpec, EventSection } from "@/ui/componentBase"; +import { MetadataItem } from "@/ui/metadataList"; +import awsCloudwatchIcon from "@/assets/icons/integrations/aws.cloudwatch.svg"; +import { formatTimeAgo } from "@/utils/date"; +import { formatTimestampInUserTimezone } from "@/utils/timezone"; +import { getBackgroundColorClass, getColorClass } from "@/utils/colors"; +import { getState, getStateMap, getTriggerRenderer } from "../.."; +import { numberOrZero, stringOrDash } from "../../utils"; +import { CloudWatchMetricsInsightsOutput } from "./types"; + +interface QueryMetricsInsightsConfiguration { + region?: string; + query?: string; + lookbackMinutes?: number; + maxDatapoints?: number; + scanBy?: string; +} + +export const queryMetricsInsightsMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const componentName = context.componentDefinition.name || "unknown"; + + return { + title: context.node.name || context.componentDefinition.label || "Unnamed component", + iconSrc: awsCloudwatchIcon, + iconColor: getColorClass(context.componentDefinition.color), + collapsedBackground: getBackgroundColorClass(context.componentDefinition.color), + collapsed: context.node.isCollapsed, + eventSections: lastExecution + ? queryMetricsInsightsEventSections(context.nodes, lastExecution, componentName) + : undefined, + includeEmptyState: !lastExecution, + metadata: queryMetricsInsightsMetadataList(context.node), + specs: queryMetricsInsightsSpecs(context.node), + eventStateMap: getStateMap(componentName), + }; + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const outputs = context.execution.outputs as { default?: OutputPayload[] } | undefined; + const result = outputs?.default?.[0]?.data as CloudWatchMetricsInsightsOutput | undefined; + if (!result) { + return {}; + } + + const datapoints = (result.results || []).reduce( + (count, metric) => count + numberOrZero(metric.values?.length), + 0, + ); + + return { + Region: stringOrDash(result.region), + "Request ID": stringOrDash(result.requestId), + "Start Time": result.startTime ? formatTimestampInUserTimezone(result.startTime) : "-", + "End Time": result.endTime ? formatTimestampInUserTimezone(result.endTime) : "-", + "Result Series": numberOrZero(result.results?.length).toString(), + Datapoints: datapoints.toString(), + Messages: numberOrZero(result.messages?.length).toString(), + }; + }, + + subtitle(context: SubtitleContext): string { + if (!context.execution.createdAt) { + return ""; + } + return formatTimeAgo(new Date(context.execution.createdAt)); + }, +}; + +function queryMetricsInsightsMetadataList(node: NodeInfo): MetadataItem[] { + const metadata: MetadataItem[] = []; + const configuration = node.configuration as QueryMetricsInsightsConfiguration | undefined; + + if (configuration?.region) { + metadata.push({ icon: "globe", label: configuration.region }); + } + + if (configuration?.lookbackMinutes) { + metadata.push({ icon: "clock", label: `Last ${configuration.lookbackMinutes}m` }); + } + + if (configuration?.scanBy) { + metadata.push({ icon: "arrow-up-down", label: configuration.scanBy }); + } + + return metadata; +} + +function queryMetricsInsightsSpecs(node: NodeInfo): ComponentBaseSpec[] { + const specs: ComponentBaseSpec[] = []; + const configuration = node.configuration as QueryMetricsInsightsConfiguration | undefined; + + if (configuration?.query) { + specs.push({ + title: "query", + tooltipTitle: "Metrics Insights query", + iconSlug: "search", + value: configuration.query, + contentType: "text", + }); + } + + return specs; +} + +function queryMetricsInsightsEventSections( + nodes: NodeInfo[], + execution: ExecutionInfo, + componentName: string, +): EventSection[] { + const rootTriggerNode = nodes.find((n) => n.id === execution.rootEvent?.nodeId); + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode?.componentName!); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: execution.rootEvent }); + + return [ + { + receivedAt: new Date(execution.createdAt!), + eventTitle: title, + eventSubtitle: formatTimeAgo(new Date(execution.createdAt!)), + eventState: getState(componentName)(execution), + eventId: execution.rootEvent?.id!, + }, + ]; +} diff --git a/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/types.ts b/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/types.ts index 760d666f2a..9ea72a036e 100644 --- a/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/types.ts +++ b/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/types.ts @@ -17,3 +17,29 @@ export interface CloudWatchAlarmEvent { "detail-type"?: string; detail?: CloudWatchAlarmDetail; } + +export interface CloudWatchMetricDataMessage { + code?: string; + value?: string; +} + +export interface CloudWatchMetricDataResult { + id?: string; + label?: string; + statusCode?: string; + timestamps?: string[]; + values?: number[]; + messages?: CloudWatchMetricDataMessage[]; +} + +export interface CloudWatchMetricsInsightsOutput { + region?: string; + query?: string; + startTime?: string; + endTime?: string; + scanBy?: string; + maxDatapoints?: number; + requestId?: string; + results?: CloudWatchMetricDataResult[]; + messages?: CloudWatchMetricDataMessage[]; +} diff --git a/web_src/src/pages/workflowv2/mappers/aws/index.ts b/web_src/src/pages/workflowv2/mappers/aws/index.ts index e431554a64..4af1ac3db8 100644 --- a/web_src/src/pages/workflowv2/mappers/aws/index.ts +++ b/web_src/src/pages/workflowv2/mappers/aws/index.ts @@ -15,6 +15,7 @@ import { deleteRepositoryMapper } from "./codeartifact/delete_repository"; import { disposePackageVersionsMapper } from "./codeartifact/dispose_package_versions"; import { updatePackageVersionsStatusMapper } from "./codeartifact/update_package_versions_status"; import { onAlarmTriggerRenderer } from "./cloudwatch/on_alarm"; +import { queryMetricsInsightsMapper } from "./cloudwatch/query_metrics_insights"; export const componentMappers: Record = { "lambda.runFunction": runFunctionMapper, @@ -28,6 +29,7 @@ export const componentMappers: Record = { "codeArtifact.disposePackageVersions": disposePackageVersionsMapper, "codeArtifact.getPackageVersion": getPackageVersionMapper, "codeArtifact.updatePackageVersionsStatus": updatePackageVersionsStatusMapper, + "cloudwatch.queryMetricsInsights": queryMetricsInsightsMapper, }; export const triggerRenderers: Record = { @@ -48,4 +50,5 @@ export const eventStateRegistry: Record = { "codeArtifact.disposePackageVersions": buildActionStateRegistry("disposed"), "codeArtifact.getPackageVersion": buildActionStateRegistry("retrieved"), "codeArtifact.updatePackageVersionsStatus": buildActionStateRegistry("updated"), + "cloudwatch.queryMetricsInsights": buildActionStateRegistry("queried"), };