Skip to content

Commit a5b72c6

Browse files
committed
Refactor azure queue scaler
Signed-off-by: rickbrouwer <rickbrouwer@gmail.com>
1 parent b2ce95d commit a5b72c6

File tree

3 files changed

+307
-147
lines changed

3 files changed

+307
-147
lines changed

pkg/scalers/aws_sqs_queue_scaler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
)
2121

2222
const (
23+
defaultTargetQueueLength = 5
2324
targetQueueLengthDefault = 5
2425
activationTargetQueueLengthDefault = 0
2526
defaultScaleOnInFlight = true

pkg/scalers/azure_queue_scaler.go

Lines changed: 41 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package scalers
1919
import (
2020
"context"
2121
"fmt"
22-
"strconv"
2322
"strings"
2423

2524
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
@@ -34,37 +33,34 @@ import (
3433
)
3534

3635
const (
37-
queueLengthMetricName = "queueLength"
38-
activationQueueLengthMetricName = "activationQueueLength"
39-
defaultTargetQueueLength = 5
40-
externalMetricType = "External"
41-
QueueLengthStrategyAll string = "all"
42-
QueueLengthStrategyVisibleOnly string = "visibleonly"
36+
externalMetricType = "External"
37+
queueLengthStrategyVisibleOnly = "visibleonly"
4338
)
4439

45-
var (
46-
maxPeekMessages int32 = 32
47-
)
40+
var maxPeekMessages int32 = 32
4841

4942
type azureQueueScaler struct {
5043
metricType v2.MetricTargetType
51-
metadata *azureQueueMetadata
44+
metadata azureQueueMetadata
5245
queueClient *azqueue.QueueClient
5346
logger logr.Logger
5447
}
5548

5649
type azureQueueMetadata struct {
57-
targetQueueLength int64
58-
activationTargetQueueLength int64
59-
queueName string
60-
connection string
61-
accountName string
62-
endpointSuffix string
63-
queueLengthStrategy string
64-
triggerIndex int
50+
ActivationQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, default=0"`
51+
QueueName string `keda:"name=queueName, order=triggerMetadata"`
52+
QueueLength int64 `keda:"name=queueLength, order=triggerMetadata, default=5"`
53+
Connection string `keda:"name=connection, order=authParams;triggerMetadata;resolvedEnv, optional"`
54+
AccountName string `keda:"name=accountName, order=triggerMetadata, optional"`
55+
EndpointSuffix string `keda:"name=endpointSuffix, order=triggerMetadata, optional"`
56+
QueueLengthStrategy string `keda:"name=queueLengthStrategy, order=triggerMetadata, enum=all;visibleonly, default=all"`
57+
TriggerIndex int
58+
}
59+
60+
func (m *azureQueueMetadata) Validate() error {
61+
return nil
6562
}
6663

67-
// NewAzureQueueScaler creates a new scaler for queue
6864
func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
6965
metricType, err := GetMetricTargetType(config)
7066
if err != nil {
@@ -73,14 +69,14 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
7369

7470
logger := InitializeLogger(config, "azure_queue_scaler")
7571

76-
meta, podIdentity, err := parseAzureQueueMetadata(config, logger)
72+
meta, podIdentity, err := parseAzureQueueMetadata(config)
7773
if err != nil {
7874
return nil, fmt.Errorf("error parsing azure queue metadata: %w", err)
7975
}
8076

81-
queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.connection, meta.accountName, meta.endpointSuffix, meta.queueName, config.GlobalHTTPTimeout)
77+
queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.Connection, meta.AccountName, meta.EndpointSuffix, meta.QueueName, config.GlobalHTTPTimeout)
8278
if err != nil {
83-
return nil, fmt.Errorf("error creating azure blob client: %w", err)
79+
return nil, fmt.Errorf("error creating azure queue client: %w", err)
8480
}
8581

8682
return &azureQueueScaler{
@@ -91,105 +87,63 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
9187
}, nil
9288
}
9389

94-
func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
90+
func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig) (azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
9591
meta := azureQueueMetadata{}
96-
meta.targetQueueLength = defaultTargetQueueLength
97-
98-
if val, ok := config.TriggerMetadata[queueLengthMetricName]; ok {
99-
queueLength, err := strconv.ParseInt(val, 10, 64)
100-
if err != nil {
101-
logger.Error(err, "Error parsing azure queue metadata", "queueLengthMetricName", queueLengthMetricName)
102-
return nil, kedav1alpha1.AuthPodIdentity{},
103-
fmt.Errorf("error parsing azure queue metadata %s: %w", queueLengthMetricName, err)
104-
}
105-
106-
meta.targetQueueLength = queueLength
92+
err := config.TypedConfig(&meta)
93+
if err != nil {
94+
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure queue metadata: %w", err)
10795
}
10896

109-
meta.activationTargetQueueLength = 0
110-
if val, ok := config.TriggerMetadata[activationQueueLengthMetricName]; ok {
111-
activationQueueLength, err := strconv.ParseInt(val, 10, 64)
112-
if err != nil {
113-
logger.Error(err, "Error parsing azure queue metadata", activationQueueLengthMetricName, activationQueueLengthMetricName)
114-
return nil, kedav1alpha1.AuthPodIdentity{},
115-
fmt.Errorf("error parsing azure queue metadata %s: %w", activationQueueLengthMetricName, err)
116-
}
117-
118-
meta.activationTargetQueueLength = activationQueueLength
97+
err = meta.Validate()
98+
if err != nil {
99+
return meta, kedav1alpha1.AuthPodIdentity{}, err
119100
}
120101

121102
endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.QueueEndpoint)
122103
if err != nil {
123-
return nil, kedav1alpha1.AuthPodIdentity{}, err
124-
}
125-
126-
meta.endpointSuffix = endpointSuffix
127-
128-
if val, ok := config.TriggerMetadata["queueName"]; ok && val != "" {
129-
meta.queueName = val
130-
} else {
131-
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given")
132-
}
133-
134-
if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" {
135-
strategy := strings.ToLower(val)
136-
if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly {
137-
meta.queueLengthStrategy = strategy
138-
} else {
139-
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val)
140-
}
141-
} else {
142-
meta.queueLengthStrategy = QueueLengthStrategyAll
104+
return meta, kedav1alpha1.AuthPodIdentity{}, err
143105
}
106+
meta.EndpointSuffix = endpointSuffix
144107

145108
// If the Use AAD Pod Identity is not present, or set to "none"
146109
// then check for connection string
147110
switch config.PodIdentity.Provider {
148111
case "", kedav1alpha1.PodIdentityProviderNone:
149112
// Azure Queue Scaler expects a "connection" parameter in the metadata
150113
// of the scaler or in a TriggerAuthentication object
151-
if config.AuthParams["connection"] != "" {
152-
// Found the connection in a parameter from TriggerAuthentication
153-
meta.connection = config.AuthParams["connection"]
154-
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
155-
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
156-
}
157-
158-
if len(meta.connection) == 0 {
159-
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
114+
if meta.Connection == "" {
115+
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
160116
}
161117
case kedav1alpha1.PodIdentityProviderAzureWorkload:
162118
// If the Use AAD Pod Identity is present then check account name
163-
if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" {
164-
meta.accountName = val
165-
} else {
166-
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
119+
if meta.AccountName == "" {
120+
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
167121
}
168122
default:
169-
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
123+
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
170124
}
171125

172-
meta.triggerIndex = config.TriggerIndex
173-
174-
return &meta, config.PodIdentity, nil
126+
meta.TriggerIndex = config.TriggerIndex
127+
return meta, config.PodIdentity, nil
175128
}
176129

177130
func (s *azureQueueScaler) Close(context.Context) error {
178131
return nil
179132
}
180133

134+
// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
181135
func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
136+
metricName := kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.QueueName))
182137
externalMetric := &v2.ExternalMetricSource{
183138
Metric: v2.MetricIdentifier{
184-
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.queueName))),
139+
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName),
185140
},
186-
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength),
141+
Target: GetMetricTarget(s.metricType, s.metadata.QueueLength),
187142
}
188143
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
189144
return []v2.MetricSpec{metricSpec}
190145
}
191146

192-
// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
193147
func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
194148
queuelen, err := s.getMessageCount(ctx)
195149
if err != nil {
@@ -198,12 +152,11 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName
198152
}
199153

200154
metric := GenerateMetricInMili(metricName, float64(queuelen))
201-
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil
155+
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationQueueLength, nil
202156
}
203157

204158
func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) {
205-
strategy := strings.ToLower(s.metadata.queueLengthStrategy)
206-
if strategy == QueueLengthStrategyVisibleOnly {
159+
if strings.ToLower(s.metadata.QueueLengthStrategy) == queueLengthStrategyVisibleOnly {
207160
queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages})
208161
if err != nil {
209162
return 0, err

0 commit comments

Comments
 (0)