Skip to content

Commit

Permalink
Enhance SQS example
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Sep 28, 2023
1 parent fe898e5 commit dd31778
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/main/resources/localstack/init_sqs.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh

awslocal sqs create-queue --queue-name mysqs-queue --region us-east-1
echo "Initialized."
echo "Initialized AWS standard queue."
51 changes: 34 additions & 17 deletions src/main/scala/alpakka/sqs/SqsEcho.scala
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
package alpakka.sqs

import akka.actor.ActorSystem
import akka.stream.alpakka.sqs.scaladsl.{SqsAckFlow, SqsPublishSink, SqsSource}
import akka.stream.alpakka.sqs.{MessageAction, SqsPublishGroupedSettings, SqsSourceSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.alpakka.sqs.scaladsl.{SqsAckSink, SqsPublishSink, SqsSource}
import akka.stream.alpakka.sqs._
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.github.matsluni.akkahttpspi.AkkaHttpClient
import org.apache.commons.validator.routines.UrlValidator
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, CreateQueueResponse, DeleteQueueRequest, DeleteQueueResponse}
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, DeleteQueueResponse, Message}

import java.net.URI
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration.{DurationInt, SECONDS}

/**
* Echo flow via a SQS queue:
* - upload n String msgs
* - download n String msgs
* Echo flow via a SQS standard queue:
* - produce n String msgs
* - consume n String msgs
*
* Run this class against your AWS account using hardcoded accessKey/secretKey
* or
* Run via [[alpakka.sqs.SqsEchoIT]] against localstack docker container
* Run via [[alpakka.sqs.SqsEchoIT]] against localStack docker container
*
* Remarks:
* - For convenience we use the async `awsSqsClient` to create/delete the queue
Expand Down Expand Up @@ -68,7 +69,7 @@ class SqsEcho(urlWithMappedPort: URI = new URI(""), accessKey: String = "", secr
system.registerOnTermination(awsSqsClient.close())

def run(): Int = {
queueUrl = createQueue()
queueUrl = createStandardQueue()
logger.info(s"Created queue with URL: $queueUrl")

val done = for {
Expand All @@ -92,23 +93,39 @@ class SqsEcho(urlWithMappedPort: URI = new URI(""), accessKey: String = "", secr
}

private def consumerClient() = {
logger.info(s"About to start consuming from URL: {}", queueUrl)
logger.info(s"About to start consuming msgs from URL: {}", queueUrl)
// Grace time to process msgs on server
Thread.sleep(1000)

val settings = SqsSourceSettings()
// Long polling to avoid (expensive) empty reads, MAX is 20s
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
.withWaitTime(2.seconds)
.withMaxBufferSize(100)
.withMaxBatchSize(2)
.withAttributes(immutable.Seq(SenderId, SentTimestamp))
// Let the stream complete when there are no more messages on the queue
// In realistic scenarios, you should add a KillSwitch to the stream
.withCloseOnEmptyReceive(true)
// Invisible for other concurrent consumers
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
.withVisibilityTimeout(10.seconds)

val ackFlow = Flow[Message]
.map(MessageAction.Delete(_))
.to(SqsAckSink(queueUrl)) // Handle ack/deletion (via internal ReceiptHandle)

val messages =
SqsSource(
queueUrl,
SqsSourceSettings().withCloseOnEmptyReceive(true).withWaitTime(10.millis))
.map(MessageAction.delete)
.via(SqsAckFlow(queueUrl)) // Handle ack/deletion (via internal receipt handle)
SqsSource(queueUrl, settings)
.alsoTo(ackFlow)
.wireTap(msg => logger.info(s"Received msg: $msg"))
.runWith(Sink.seq)
messages
}

// When the queue already exists, return it's queueUrl
private def createQueue(): String = {
val response: CreateQueueResponse = awsSqsClient
private def createStandardQueue(): String = {
val response = awsSqsClient
.createQueue(
CreateQueueRequest.builder()
.queueName(queueName)
Expand Down
1 change: 1 addition & 0 deletions src/test/scala/alpakka/sqs/SqsEchoIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class SqsEchoIT {
.withServices(SQS)
// https://docs.localstack.cloud/user-guide/aws/sqs/#queue-urls
.withEnv("SQS_ENDPOINT_STRATEGY", "domain")
// Redundant to createQueue(), left to show that this is a way to init a queue
.withCopyFileToContainer(MountableFile.forClasspathResource("/localstack/init_sqs.sh", 700), "/etc/localstack/init/ready.d/init_sqs.sh");

@BeforeAll
Expand Down

0 comments on commit dd31778

Please sign in to comment.