Skip to content

Commit

Permalink
Add User Agent for Sqs Calls made using Automatic Batching Manager (#…
Browse files Browse the repository at this point in the history
…5546)

* Add User Agent for Sqs Calls made using Automatic Batching Manager as hll/abm

* Review comments
  • Loading branch information
joviegas authored Sep 3, 2024
1 parent 041eaa4 commit e570e2b
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -51,26 +53,34 @@ protected ChangeMessageVisibilityBatchManager(RequestBatchConfiguration override

private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest(
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
List<ChangeMessageVisibilityBatchRequestEntry> entries = identifiedRequests
.stream()
.map(identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry(identifiedRequest.id(),
identifiedRequest.message()))
.collect(Collectors.toList());
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
// request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()

List<ChangeMessageVisibilityBatchRequestEntry> entries =
identifiedRequests.stream()
.map(identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry(
identifiedRequest.id(),
identifiedRequest.message()))
.collect(Collectors.toList());

// All requests have the same overrideConfiguration, so it's sufficient to retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
.message()
.overrideConfiguration();
return overrideConfiguration.map(
config -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(config)
.entries(entries)
.build())
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.entries(entries)
.build());

return overrideConfiguration
.map(config -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(config.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build())
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(o -> o
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build());
}

private static ChangeMessageVisibilityBatchRequestEntry createChangeMessageVisibilityBatchRequestEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -50,20 +52,38 @@ protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfigurat

private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
List<IdentifiableMessage<DeleteMessageRequest>> identifiedRequests, String batchKey) {

List<DeleteMessageBatchRequestEntry> entries = identifiedRequests
.stream()
.map(identifiedRequest -> createDeleteMessageBatchRequestEntry(identifiedRequest.id(),
identifiedRequest.message()))
.map(identifiedRequest -> createDeleteMessageBatchRequestEntry(
identifiedRequest.id(), identifiedRequest.message()
))
.collect(Collectors.toList());

// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
// request.
// all requests must have the same overrideConfiguration, so it is sufficient to retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()
.overrideConfiguration();

return overrideConfiguration.map(
overrideConfig -> DeleteMessageBatchRequest.builder().queueUrl(batchKey).overrideConfiguration(overrideConfig)
.entries(entries).build()).orElse(
DeleteMessageBatchRequest.builder().queueUrl(batchKey).entries(entries).build());
overrideConfig -> DeleteMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(
overrideConfig.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build()
)
.entries(entries)
.build()
).orElseGet(
() -> DeleteMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(o ->
o.applyMutation(USER_AGENT_APPLIER).build()
)
.entries(entries)
.build()
);
}

private static DeleteMessageBatchRequestEntry createDeleteMessageBatchRequestEntry(String id, DeleteMessageRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -138,22 +140,21 @@ private CompletableFuture<Map<QueueAttributeName, String>> fetchQueueAttributes(
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(queueUrl)
.attributeNames(QUEUE_ATTRIBUTE_NAMES)
.overrideConfiguration(o -> o
.applyMutation(USER_AGENT_APPLIER))
.build();

CompletableFuture<Map<QueueAttributeName, String>> future =
sqsClient.getQueueAttributes(request)
.thenApply(response -> {
Map<QueueAttributeName, String> attributes = response.attributes();
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
+ " attribute is null in SQS.");
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
return attributes.entrySet().stream()
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
});

return future;
return sqsClient.getQueueAttributes(request)
.thenApply(response -> {
Map<QueueAttributeName, String> attributes = response.attributes();
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
+ " attribute is null in SQS.");
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
return attributes.entrySet().stream()
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package software.amazon.awssdk.services.sqs.internal.batchmanager;


import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -72,7 +74,8 @@ public CompletableFuture<ReceiveSqsMessageHelper> asyncReceiveMessage() {
ReceiveMessageRequest.Builder request =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(config.maxBatchItems());
.maxNumberOfMessages(config.maxBatchItems())
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER));

if (!CollectionUtils.isNullOrEmpty(config.messageSystemAttributeNames())) {
request.messageSystemAttributeNames(config.messageSystemAttributeNames());
Expand Down Expand Up @@ -158,10 +161,12 @@ private CompletableFuture<ChangeMessageVisibilityBatchResponse> nackMessages() {
.build())
.collect(Collectors.toList());

ChangeMessageVisibilityBatchRequest batchRequest = ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.build();
ChangeMessageVisibilityBatchRequest batchRequest =
ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER))
.build();

return asyncClient.changeMessageVisibilityBatch(batchRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -83,19 +85,31 @@ private static IdentifiableMessage<SendMessageResponse> createSendMessageRespons

private static SendMessageBatchRequest createSendMessageBatchRequest(
List<IdentifiableMessage<SendMessageRequest>> identifiedRequests, String batchKey) {
List<SendMessageBatchRequestEntry> entries = identifiedRequests
.stream()
.map(identifiedRequest -> createSendMessageBatchRequestEntry(identifiedRequest.id(), identifiedRequest.message()))
.collect(Collectors.toList());
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
// request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()

List<SendMessageBatchRequestEntry> entries =
identifiedRequests.stream()
.map(identifiedRequest -> createSendMessageBatchRequestEntry(identifiedRequest.id(),
identifiedRequest.message()))
.collect(Collectors.toList());

// All requests must have the same overrideConfiguration, so retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
.message()
.overrideConfiguration();
return overrideConfiguration.map(
overrideConfig -> SendMessageBatchRequest.builder().queueUrl(batchKey).overrideConfiguration(overrideConfig)
.entries(entries).build()).orElse(
SendMessageBatchRequest.builder().queueUrl(batchKey).entries(entries).build());

return overrideConfiguration
.map(overrideConfig -> SendMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(overrideConfig.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build())
.orElseGet(() -> SendMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER))
.entries(entries)
.build());
}

private static SendMessageBatchRequestEntry createSendMessageBatchRequestEntry(String id, SendMessageRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.ApiName;

@SdkInternalApi
public final class SqsMessageDefault {
Expand All @@ -24,6 +27,11 @@ public final class SqsMessageDefault {

public static final int MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB

// abm stands for Automatic Batching Manager
public static final Consumer<AwsRequestOverrideConfiguration.Builder> USER_AGENT_APPLIER =
b -> b.addApiName(ApiName.builder().version("abm").name("hll").build());


/**
* <a href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes">
* AWS SQS Message Attributes Documentation</a>
Expand Down
Loading

0 comments on commit e570e2b

Please sign in to comment.