Skip to content

Commit

Permalink
batch send sqs messages (#269)
Browse files Browse the repository at this point in the history
* low-level method to send messages in batch

* batch publish in declarative clients
  • Loading branch information
musketyr authored Oct 24, 2024
1 parent f6835d5 commit bdfb885
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 12 deletions.
4 changes: 3 additions & 1 deletion docs/guide/src/docs/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ include::{root-dir}/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/
<6> You can publish a string with custom delay
<7> You can publish a string with custom FIFO queue group
<8> You can publish a string with custom delay and FIFO queue group
<9> You can delete published message using the message ID if the
<9> You can send multiple messages at once when the argument is `Publisher`
<10> If the return type is also publisher type then **you need to subscribe to the publisher to actually send the messages**
<11> You can delete published message using the message ID if the

[source,java,indent=0,options="nowrap",role="secondary"]
.Publishing String Records (AWS SDK 1.x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation "space.jasan:groovy-closure-support:$closureSupportVersion"
implementation 'io.micronaut.validation:micronaut-validation'
implementation 'io.micronaut:micronaut-jackson-databind'
implementation 'io.micronaut.reactor:micronaut-reactor'

testImplementation project(':micronaut-amazon-awssdk-integration-testing')
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package com.agorapulse.micronaut.amazon.awssdk.sqs;

import io.micronaut.core.util.StringUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.sqs.SqsClient;
Expand Down Expand Up @@ -278,6 +280,54 @@ public String sendMessage(String queueName, String messageBody, int delaySeconds
return messageId;
}

@Override
public Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, int delaySeconds, String groupId) {
String queueUrl = getQueueUrl(queueName);

return Flux.from(messageBodies).map(messageBody -> {
SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry
.builder()
.id(UUID.randomUUID().toString())
.messageBody(messageBody);

if (delaySeconds > 0) {
request.delaySeconds(delaySeconds);
}

if (StringUtils.isNotEmpty(groupId)) {
request.messageGroupId(groupId);
}

return request.build();
}).buffer(10).map(batch -> {
SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch);
SendMessageBatchResponse response = client.sendMessageBatch(request.build());
return response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList();
}).flatMap(Flux::fromIterable);
}

@Override
public Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, Consumer<SendMessageBatchRequestEntry.Builder> messageConfiguration) {
String queueUrl = getQueueUrl(queueName);

return Flux.from(messageBodies).map(messageBody -> {
SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry.builder().messageBody(messageBody);
messageConfiguration.accept(request);
return request.build();
}).buffer(10).flatMap(batch -> Flux.<List<String>>generate(synchronousSink -> {
SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch);
SendMessageBatchResponse response = client.sendMessageBatch(request.build());
if (response.failed().isEmpty()) {
synchronousSink.next(response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList());
synchronousSink.complete();
} else {
synchronousSink.error(new IllegalArgumentException("Following messages were not sent:\n" + response.failed().stream().map(e ->
String.format("Message %s failed with code %s and message %s%n", e.id(), e.code(), e.message())
).toList()));
}
})).flatMap(Flux::fromIterable);
}

/**
* @param queueName
* @param messageBody
Expand Down Expand Up @@ -319,9 +369,7 @@ private void addQueue(String queueUrl) {
throw new IllegalStateException("Queue URL cannot be null or empty");
}

synchronized (queueUrlByNames) {
queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl);
}
queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl);
}

private void loadQueues() {
Expand All @@ -340,20 +388,16 @@ private void loadQueues() {
)
);

synchronized (queueUrlByNames) {
queueUrlByNames.clear();
queueUrlByNames.putAll(queueUrls);
}
}

private void removeQueue(String queueUrl) {
if (StringUtils.isEmpty(queueUrl)) {
throw new IllegalStateException("Queue URL cannot be null or empty");
}

synchronized (queueUrlByNames) {
queueUrlByNames.remove(getQueueNameFromUrl(queueUrl));
}
queueUrlByNames.remove(getQueueNameFromUrl(queueUrl));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.qualifiers.Qualifiers;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

Expand Down Expand Up @@ -108,7 +111,7 @@ public Object intercept(MethodInvocationContext<Object, Object> context) {
}

private Object doIntercept(MethodInvocationContext<Object, Object> context, SimpleQueueService service, String queueName, String group, Integer delay) {
Argument[] arguments = context.getArguments();
Argument<?>[] arguments = context.getArguments();
Map<String, Object> params = context.getParameterValueMap();

if (arguments.length == 1 && context.getMethodName().startsWith("delete")) {
Expand Down Expand Up @@ -141,15 +144,40 @@ private Object doIntercept(MethodInvocationContext<Object, Object> context, Simp
return service.sendMessage(queueName, new String((byte[]) message), delay, group);
}

if (Publisher.class.isAssignableFrom(messageType)) {
Publisher<String> messageIdsPublisher;

if (queueArguments.message.getTypeParameters()[0].equalsType(Argument.STRING)) {
messageIdsPublisher = service.sendMessages(queueName, (Publisher<String>) message, delay, group);
} else {
messageIdsPublisher = service.sendMessages(queueName, Flux.from((Publisher<?>) message).map(this::convertMessageToJson), delay, group);
}

if (context.getReturnType().asArgument().isVoid()) {
Flux.from(messageIdsPublisher).subscribe();
return null;
}

if (Publishers.isConvertibleToPublisher(context.getReturnType().getType())) {
return Publishers.convertPublisher(beanContext.getConversionService(), messageIdsPublisher, context.getReturnType().getType());
}

return beanContext.getConversionService().convert(messageIdsPublisher, context.getReturnType().getType());
}

return sendJson(service, queueName, message, delay, group);
}

throw new UnsupportedOperationException("Cannot implement method " + context.getExecutableMethod());
}

private String sendJson(SimpleQueueService service, String queueName, Object message, int delay, String group) {
return service.sendMessage(queueName, convertMessageToJson(message), delay, group);
}

private String convertMessageToJson(Object message) {
try {
return service.sendMessage(queueName, objectMapper.writeValueAsString(message), delay, group);
return objectMapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Failed to marshal " + message + " to JSON", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package com.agorapulse.micronaut.amazon.awssdk.sqs;

import org.reactivestreams.Publisher;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

import java.util.List;
Expand Down Expand Up @@ -288,4 +290,86 @@ default String sendMessage(String messageBody, Consumer<SendMessageRequest.Build
* @return message id
*/
String sendMessage(String queueName, String messageBody, Consumer<SendMessageRequest.Builder> messageConfiguration);

/**
* Send message immediately
* @param queueName the name of the queue
* @param messageBodies the message bodies to be sent
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies) {
return sendMessages(queueName, messageBodies, 0);
}

/**
* Send message with given delay.
* @param queueName the name of the queue
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @param groupId group id for FIFO queues
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, int delaySeconds, String groupId);

/**
* Send message with given delay.
* @param queueName the name of the queue
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, int delaySeconds) {
return sendMessages(queueName, messageBodies, delaySeconds, null);
}

/**
* Send message in default queue immediately
* @param messageBodies the message bodies to be sent
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(Publisher<String> messageBodies) {
return sendMessages(getDefaultQueueName(), messageBodies);
}

/**
* Send message in the default queue with given delay.
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(Publisher<String> messageBodies, int delaySeconds) {
return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds);
}

/**
* Send message with given delay.
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @param groupId group id for FIFO queues
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessage(Publisher<String> messageBodies, int delaySeconds, String groupId) {
return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds, groupId);
}

/**
* Sends message with additional configuration into the default queue.
* @param messageBodies the message bodies to be sent
* @param messageConfiguration additional configuration
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(Publisher<String> messageBodies, Consumer<SendMessageBatchRequestEntry.Builder> messageConfiguration) {
return sendMessages(getDefaultQueueName(), messageBodies, messageConfiguration);
}

/**
* Sends message with additional configuration into the given queue.
* @param queueName name of the queue
* @param messageBodies the message bodies to be sent
* @param messageConfiguration additional configuration
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, Consumer<SendMessageBatchRequestEntry.Builder> messageConfiguration);


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import groovy.lang.DelegatesTo;
import groovy.transform.stc.ClosureParams;
import groovy.transform.stc.FromString;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import space.jasan.support.groovy.closure.ConsumerWithDelegate;

Expand Down Expand Up @@ -69,4 +71,40 @@ public static String sendMessage(
return self.sendMessage(queueName, messageBody, ConsumerWithDelegate.create(messageConfiguration));
}

/**
* Sends message with additional configuration into the default queue.
*
* @param messageBodies message bodies
* @param messageConfiguration additional configuration
* @return message id
*/
public static Publisher<String> sendMessages(
SimpleQueueService self,
Publisher<String> messageBodies,
@DelegatesTo(value = SendMessageBatchRequestEntry.Builder.class, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder")
Closure<?> messageConfiguration
) {
return self.sendMessages(self.getDefaultQueueName(), messageBodies, ConsumerWithDelegate.create(messageConfiguration));
}

/**
* Sends message with additional configuration into the given queue.
*
* @param queueName name of the queue
* @param messageBodies message bodies
* @param messageConfiguration additional configuration
* @return message id
*/
public static Publisher<String> sendMessages(
SimpleQueueService self,
String queueName,
Publisher<String> messageBodies,
@DelegatesTo(value = SendMessageRequest.Builder.class, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder")
Closure<?> messageConfiguration
) {
return self.sendMessages(queueName, messageBodies, ConsumerWithDelegate.create(messageConfiguration));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.Queue;
import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.QueueClient;
import org.reactivestreams.Publisher;

@QueueClient // <1>
interface DefaultClient {
Expand All @@ -38,7 +39,11 @@ interface DefaultClient {

String sendMessage(String record, int delay, String group); // <8>

void deleteMessage(String messageId); // <9>
void sendStringMessages(Publisher<String> messages); // <9>

Publisher<String> sendMessages(Publisher<Pogo> messages); // <10>

void deleteMessage(String messageId); // <11>

String OTHER_QUEUE = "OtherQueue";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs
import com.fasterxml.jackson.databind.ObjectMapper
import io.micronaut.context.ApplicationContext
import io.micronaut.inject.qualifiers.Qualifiers
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import spock.lang.AutoCleanup
import spock.lang.Specification

Expand Down Expand Up @@ -146,6 +148,29 @@ class QueueClientSpec extends Specification {
1 * defaultService.sendMessage(DEFAULT_QUEUE_NAME, MESSAGE, DELAY, GROUP) >> ID
}

void 'can send multiple messages when publisher is a parameter'() {
given:
List<String> ids = [ID + 1, ID + 2, ID + 3]
DefaultClient client = context.getBean(DefaultClient)

when:
Publisher<String> messages = client.sendMessages(Flux.just(POGO, POGO, POGO))

then:
Flux.from(messages).collectList().block() == ids

1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3)
}

void 'can send multiple string messages and return void'() {
given:
DefaultClient client = context.getBean(DefaultClient)
when:
client.sendStringMessages(Flux.just(MESSAGE, MESSAGE, MESSAGE))
then:
1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3)
}

void 'needs to follow the method convention rules'() {
given:
TestClient client = context.getBean(TestClient)
Expand Down
Loading

0 comments on commit bdfb885

Please sign in to comment.