diff --git a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala index 5dd3bb7..eff2872 100644 --- a/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala +++ b/src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala @@ -15,7 +15,7 @@ package org.apache.pekko.persistence.dynamodb.journal import com.amazonaws.AmazonWebServiceRequest import com.amazonaws.handlers.AsyncHandler -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync import com.amazonaws.services.dynamodbv2.model._ import org.apache.pekko import pekko.actor.{ ActorRef, Scheduler } @@ -25,9 +25,8 @@ import pekko.persistence.dynamodb.{ DynamoDBConfig, Item } import pekko.util.ccompat.JavaConverters._ import java.util.{ concurrent => juc } - -import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, Future, Promise } case class LatencyReport(nanos: Long, retries: Int) private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis) @@ -36,7 +35,7 @@ trait DynamoDBHelper { implicit val ec: ExecutionContext val scheduler: Scheduler - val dynamoDB: AmazonDynamoDBAsyncClient + val dynamoDB: AmazonDynamoDBAsync val log: LoggingAdapter val settings: DynamoDBConfig import settings._ diff --git a/src/main/scala/org/apache/pekko/persistence/dynamodb/package.scala b/src/main/scala/org/apache/pekko/persistence/dynamodb/package.scala index 76f04c6..9ca736a 100644 --- a/src/main/scala/org/apache/pekko/persistence/dynamodb/package.scala +++ b/src/main/scala/org/apache/pekko/persistence/dynamodb/package.scala @@ -13,16 +13,18 @@ package org.apache.pekko.persistence -import java.nio.ByteBuffer -import java.util.concurrent.Executors +import com.amazonaws.auth.{ AWSStaticCredentialsProvider, BasicAWSCredentials } +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.regions.Regions +import com.amazonaws.services.dynamodbv2.model.{ AttributeValue, AttributeValueUpdate } +import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDBAsync, AmazonDynamoDBAsyncClientBuilder } import org.apache.pekko.actor.{ ActorSystem, Scheduler } import org.apache.pekko.dispatch.ExecutionContexts import org.apache.pekko.event.{ Logging, LoggingAdapter } import org.apache.pekko.persistence.dynamodb.journal.DynamoDBHelper -import com.amazonaws.auth.BasicAWSCredentials -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient -import com.amazonaws.services.dynamodbv2.model.{ AttributeValue, AttributeValueUpdate } +import java.nio.ByteBuffer +import java.util.concurrent.Executors import java.util.{ Map => JMap } import scala.collection.generic.CanBuildFrom import scala.concurrent.{ ExecutionContext, Future, Promise } @@ -69,16 +71,23 @@ package object dynamodb { val conns = settings.client.config.getMaxConnections val executor = Executors.newFixedThreadPool(conns) val creds = new BasicAWSCredentials(settings.AwsKey, settings.AwsSecret) - new AmazonDynamoDBAsyncClient(creds, settings.client.config, executor) + AmazonDynamoDBAsyncClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(creds)) + .withClientConfiguration(settings.client.config) + .withExecutorFactory(() => executor) + .withEndpointConfiguration(new EndpointConfiguration(settings.Endpoint, Regions.DEFAULT_REGION.name())) + .build() } else { - new AmazonDynamoDBAsyncClient(settings.client.config) + AmazonDynamoDBAsyncClientBuilder.standard() + .withClientConfiguration(settings.client.config) + .withEndpointConfiguration(new EndpointConfiguration(settings.Endpoint, Regions.DEFAULT_REGION.name())) + .build() } - client.setEndpoint(settings.Endpoint) val dispatcher = system.dispatchers.lookup(settings.ClientDispatcher) class DynamoDBClient( override val ec: ExecutionContext, - override val dynamoDB: AmazonDynamoDBAsyncClient, + override val dynamoDB: AmazonDynamoDBAsync, override val settings: DynamoDBConfig, override val scheduler: Scheduler, override val log: LoggingAdapter)