Skip to content

Refactor azure queue scaler #6269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const (
defaultTargetQueueLength = 5
targetQueueLengthDefault = 5
activationTargetQueueLengthDefault = 0
defaultScaleOnInFlight = true
Expand Down
124 changes: 34 additions & 90 deletions pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
Expand All @@ -34,37 +33,30 @@ import (
)

const (
queueLengthMetricName = "queueLength"
activationQueueLengthMetricName = "activationQueueLength"
defaultTargetQueueLength = 5
externalMetricType = "External"
QueueLengthStrategyAll string = "all"
QueueLengthStrategyVisibleOnly string = "visibleonly"
externalMetricType = "External"
queueLengthStrategyVisibleOnly = "visibleonly"
)

var (
maxPeekMessages int32 = 32
)
var maxPeekMessages int32 = 32

type azureQueueScaler struct {
metricType v2.MetricTargetType
metadata *azureQueueMetadata
metadata azureQueueMetadata
queueClient *azqueue.QueueClient
logger logr.Logger
}

type azureQueueMetadata struct {
targetQueueLength int64
activationTargetQueueLength int64
queueName string
connection string
accountName string
endpointSuffix string
queueLengthStrategy string
triggerIndex int
ActivationQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, default=0"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueLength int64 `keda:"name=queueLength, order=triggerMetadata, default=5"`
Connection string `keda:"name=connection, order=authParams;triggerMetadata;resolvedEnv, optional"`
AccountName string `keda:"name=accountName, order=triggerMetadata, optional"`
EndpointSuffix string `keda:"name=endpointSuffix, order=triggerMetadata, optional"`
QueueLengthStrategy string `keda:"name=queueLengthStrategy, order=triggerMetadata, enum=all;visibleonly, default=all"`
TriggerIndex int
}

// NewAzureQueueScaler creates a new scaler for queue
func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -73,14 +65,14 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

logger := InitializeLogger(config, "azure_queue_scaler")

meta, podIdentity, err := parseAzureQueueMetadata(config, logger)
meta, podIdentity, err := parseAzureQueueMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing azure queue metadata: %w", err)
}

queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.connection, meta.accountName, meta.endpointSuffix, meta.queueName, config.GlobalHTTPTimeout)
queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.Connection, meta.AccountName, meta.EndpointSuffix, meta.QueueName, config.GlobalHTTPTimeout)
if err != nil {
return nil, fmt.Errorf("error creating azure blob client: %w", err)
return nil, fmt.Errorf("error creating azure queue client: %w", err)
}

return &azureQueueScaler{
Expand All @@ -91,105 +83,58 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}, nil
}

func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig) (azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
meta := azureQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength

if val, ok := config.TriggerMetadata[queueLengthMetricName]; ok {
queueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
logger.Error(err, "Error parsing azure queue metadata", "queueLengthMetricName", queueLengthMetricName)
return nil, kedav1alpha1.AuthPodIdentity{},
fmt.Errorf("error parsing azure queue metadata %s: %w", queueLengthMetricName, err)
}

meta.targetQueueLength = queueLength
}

meta.activationTargetQueueLength = 0
if val, ok := config.TriggerMetadata[activationQueueLengthMetricName]; ok {
activationQueueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
logger.Error(err, "Error parsing azure queue metadata", activationQueueLengthMetricName, activationQueueLengthMetricName)
return nil, kedav1alpha1.AuthPodIdentity{},
fmt.Errorf("error parsing azure queue metadata %s: %w", activationQueueLengthMetricName, err)
}

meta.activationTargetQueueLength = activationQueueLength
err := config.TypedConfig(&meta)
if err != nil {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure queue metadata: %w", err)
}

endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.QueueEndpoint)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
}

meta.endpointSuffix = endpointSuffix

if val, ok := config.TriggerMetadata["queueName"]; ok && val != "" {
meta.queueName = val
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given")
}

if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" {
strategy := strings.ToLower(val)
if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly {
meta.queueLengthStrategy = strategy
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val)
}
} else {
meta.queueLengthStrategy = QueueLengthStrategyAll
return meta, kedav1alpha1.AuthPodIdentity{}, err
}
meta.EndpointSuffix = endpointSuffix

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
// Azure Queue Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
if config.AuthParams["connection"] != "" {
// Found the connection in a parameter from TriggerAuthentication
meta.connection = config.AuthParams["connection"]
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
}

if len(meta.connection) == 0 {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
if meta.Connection == "" {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
}
case kedav1alpha1.PodIdentityProviderAzureWorkload:
// If the Use AAD Pod Identity is present then check account name
if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" {
meta.accountName = val
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
if meta.AccountName == "" {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
}
default:
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
}

meta.triggerIndex = config.TriggerIndex

return &meta, config.PodIdentity, nil
meta.TriggerIndex = config.TriggerIndex
return meta, config.PodIdentity, nil
}

func (s *azureQueueScaler) Close(context.Context) error {
return nil
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.QueueName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength),
Target: GetMetricTarget(s.metricType, s.metadata.QueueLength),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queuelen, err := s.getMessageCount(ctx)
if err != nil {
Expand All @@ -198,12 +143,11 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName
}

metric := GenerateMetricInMili(metricName, float64(queuelen))
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationQueueLength, nil
}

func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) {
strategy := strings.ToLower(s.metadata.queueLengthStrategy)
if strategy == QueueLengthStrategyVisibleOnly {
if strings.ToLower(s.metadata.QueueLengthStrategy) == queueLengthStrategyVisibleOnly {
queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages})
if err != nil {
return 0, err
Expand Down
Loading
Loading