diff --git a/docs/guide/src/docs/asciidoc/dynamodb.adoc b/docs/guide/src/docs/asciidoc/dynamodb.adoc index 47a999d1c..95dff7e2e 100644 --- a/docs/guide/src/docs/asciidoc/dynamodb.adoc +++ b/docs/guide/src/docs/asciidoc/dynamodb.adoc @@ -498,6 +498,8 @@ The following table summarizes the supported method signatures: `List getAll(@HashKey String parentId, String... rangeKeys);` + `List getAll(@HashKey List parentIds);` + | Loads a single entity or a list of entities from the table. Range key is required for tables which defines the range key diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbService.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbService.java index 408604e40..feb975d2f 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbService.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbService.java @@ -145,6 +145,8 @@ default Publisher updateAll(Publisher items, Function getAll(Object partitionKey, Publisher sortKeys); + Publisher getAll(Publisher partitionKeys); + Publisher get(Key key); Publisher count(DetachedQuery query); diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java index 337180d60..7437b60fe 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/AsyncDynamoDbServiceIntroduction.java @@ -284,6 +284,9 @@ private Publisher handleGet(AsyncDynamoDbService service, MethodInvoca Object partitionValue = partitionAndSort.getPartitionValue(params); if (!partitionAndSort.hasSortKey()) { + if (partitionAndSort.isPartitionKeyPublisherOrIterable()) { + return service.getAll(partitionAndSort.getPartitionAttributeValues(conversionService, params)); + } return service.get(partitionValue, null); } diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java index b2e5bff37..f49ec456f 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.java @@ -230,6 +230,11 @@ public Publisher getAll(Object partitionKey, Publisher sortKeys) { return doWithKeys(partitionKey, sortKeys, this::getAll); } + @Override + public Publisher getAll(Publisher partitionKeys) { + return doWithKeys(partitionKeys, this::getAllByAttributeValue); + } + @Override public Publisher get(Key key) { return Mono.fromFuture(table.getItem(key)).map(this::postLoad); @@ -289,6 +294,19 @@ private DetachedQuery simplePartitionAndSort(Object partitionKey, Object sort }); } + private Publisher getAllByAttributeValue(Publisher partitionKeys) { + TableSchema tableSchema = table.tableSchema(); + Map order = new ConcurrentHashMap<>(); + AtomicInteger counter = new AtomicInteger(); + Comparator comparator = Comparator.comparingInt(i -> order.getOrDefault(tableSchema.attributeValue(i, tableSchema.tableMetadata().primaryPartitionKey()), 0)); + + return Flux.from(partitionKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> b.readBatches(batchRangeKeys.stream().map(k -> { + order.put(k, counter.getAndIncrement()); + return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(k).build()).build(); + }).toList()))).flatMap(r -> Flux.from(r.resultsForTable(table)).map(this::postLoad)).sort(comparator); + + } + private Publisher getAll(AttributeValue hashKey, Publisher rangeKeys) { TableSchema tableSchema = table.tableSchema(); Map order = new ConcurrentHashMap<>(); @@ -377,4 +395,9 @@ private Publisher doWithKeys(Object partitionKey, Publisher sortKeys, Optional sortKeyName = table.tableSchema().tableMetadata().primarySortKey(); return function.apply(partitionKeyValue, Flux.from(sortKeys).map(key -> attributeConversionHelper.convert(table, sortKeyName.get(), key))); } + + private Publisher doWithKeys(Publisher partitionKeys, Function, Publisher> function) { + String hashKeyName = table.tableSchema().tableMetadata().primaryPartitionKey(); + return function.apply(Flux.from(partitionKeys).map(key -> attributeConversionHelper.convert(table, hashKeyName, key))); + } } diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java index 27329de46..cafc5b9d2 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDbService.java @@ -224,6 +224,11 @@ public Publisher getAll(Object partitionKey, Publisher sortKeys) { return doWithKeys(partitionKey, sortKeys, this::getAll); } + @Override + public Publisher getAll(Publisher partitionKeys) { + return doWithKeys(partitionKeys, this::getAllByAttributeValue); + } + @Override public T get(Key key) { T item = table.getItem(key); @@ -286,6 +291,22 @@ private DetachedQuery simplePartitionAndSort(Object partitionKey, Object sort }); } + private Publisher getAllByAttributeValue(Publisher partitionKeys) { + TableSchema tableSchema = table.tableSchema(); + Map order = new ConcurrentHashMap<>(); + AtomicInteger counter = new AtomicInteger(); + + return Flux.from(partitionKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> b.readBatches(batchRangeKeys.stream().map(k -> { + order.put(k, counter.getAndIncrement()); + return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(k).build()).build(); + }) + .collect(Collectors.toList())))).flatMap(r -> { + Comparator comparator = Comparator.comparingInt(i -> order.getOrDefault(tableSchema.attributeValue(i, tableSchema.tableMetadata().primaryPartitionKey()), 0)); + List it = r.resultsForTable(table).stream().sorted(comparator).collect(Collectors.toList()); + return Flux.fromIterable(it).map(this::postLoad); + }); + } + private Publisher getAll(AttributeValue hashKey, Publisher rangeKeys) { TableSchema tableSchema = table.tableSchema(); Map order = new ConcurrentHashMap<>(); @@ -377,4 +398,9 @@ private Publisher doWithKeys(Object partitionKey, Publisher sortKeys, Optional sortKeyName = table.tableSchema().tableMetadata().primarySortKey(); return function.apply(partitionKeyValue, Flux.from(sortKeys).map(key -> attributeConversionHelper.convert(table, sortKeyName.get(), key))); } + + private Publisher doWithKeys(Publisher partitionKeys, Function, Publisher> function) { + String hashKeyName = table.tableSchema().tableMetadata().primaryPartitionKey(); + return function.apply(Flux.from(partitionKeys).map(key -> attributeConversionHelper.convert(table, hashKeyName, key))); + } } diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDbService.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDbService.java index 4adf6d819..45700e323 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDbService.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDbService.java @@ -145,6 +145,8 @@ default int updateAll(Publisher items, Function, Upda Publisher getAll(Object partitionKey, Publisher sortKeys); + Publisher getAll(Publisher partitionKeys); + T get(Key key); int count(DetachedQuery query); diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/SyncDynamoDbServiceIntroduction.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/SyncDynamoDbServiceIntroduction.java index 2ff00bea5..89ae08d37 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/SyncDynamoDbServiceIntroduction.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/SyncDynamoDbServiceIntroduction.java @@ -232,6 +232,9 @@ private Object handleGet(DynamoDbService service, MethodInvocationContext Object partitionValue = partitionAndSort.getPartitionValue(params); if (!partitionAndSort.hasSortKey()) { + if (partitionAndSort.isPartitionKeyPublisherOrIterable()) { + return publisherOrIterable(service.getAll(partitionAndSort.getPartitionAttributeValues(conversionService, params)), context.getReturnType().getType()); + } return service.get(partitionValue, null); } diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/HashKey.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/HashKey.java index 0a963e4e8..49425772b 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/HashKey.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/HashKey.java @@ -29,6 +29,7 @@ * * @deprecated use @{@link PartitionKey} instead */ +@Deprecated @Inherited @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/RangeKey.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/RangeKey.java index a32f85d68..0cd692768 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/RangeKey.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/annotation/RangeKey.java @@ -29,6 +29,7 @@ * * @deprecated use @{@link SortKey} instead */ +@Deprecated @Inherited @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/util/QueryArguments.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/util/QueryArguments.java index 87c53315c..09cd0b0d9 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/util/QueryArguments.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/main/java/com/agorapulse/micronaut/amazon/awssdk/dynamodb/util/QueryArguments.java @@ -157,6 +157,10 @@ public Publisher getSortAttributeValues(ConversionService conversionService, return sortKey == null ? Flux.empty() : toPublisher(conversionService, Object.class, sortKey.getFirstArgument(), params); } + public Publisher getPartitionAttributeValues(ConversionService conversionService, Map> params) { + return partitionKey == null ? Flux.empty() : toPublisher(conversionService, Object.class, partitionKey, params); + } + public Consumer> generateQuery(MethodInvocationContext context, ConversionService conversionService) { return q -> { @@ -217,6 +221,10 @@ public boolean isSortKeyPublisherOrIterable() { return sortKey.getFirstArgument().getType().isArray() || Iterable.class.isAssignableFrom(sortKey.getFirstArgument().getType()) || Publisher.class.isAssignableFrom(sortKey.getFirstArgument().getType()); } + public boolean isPartitionKeyPublisherOrIterable() { + return partitionKey.getType().isArray() || Iterable.class.isAssignableFrom(partitionKey.getType()) || Publisher.class.isAssignableFrom(partitionKey.getType()); + } + public boolean isCustomized() { return index != null || consistent || descending || !filters.isEmpty() || sortKey != null && sortKey.getOperator() != Filter.Operator.EQ || lastEvaluatedKey != null || limit != null || page != null; } diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDBServiceNoRangeSpec.groovy b/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDBServiceNoRangeSpec.groovy index be62ed396..38fce11a8 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDBServiceNoRangeSpec.groovy +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultDynamoDBServiceNoRangeSpec.groovy @@ -60,6 +60,7 @@ class DefaultDynamoDBServiceNoRangeSpec extends Specification { service.count('5') == 1 service.get('6') service.count('6') == 1 + service.getAll(['1', '2', '3', '4', '5', '6']).parentId == ['1', '2', '3', '4', '5', '6'] when: service.increment('1') diff --git a/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDBEntityNoRangeService.java b/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDBEntityNoRangeService.java index 3eba671b5..3e2a2e87b 100644 --- a/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDBEntityNoRangeService.java +++ b/subprojects/micronaut-amazon-awssdk-dynamodb/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/dynamodb/DynamoDBEntityNoRangeService.java @@ -35,6 +35,7 @@ public interface DynamoDBEntityNoRangeService { DynamoDBEntityNoRange get(@PartitionKey String parentId); + List getAll(@PartitionKey List parentIds); DynamoDBEntityNoRange save(DynamoDBEntityNoRange entity); Flowable saveAll(DynamoDBEntityNoRange... entities);