diff --git a/.changes/next-release/bugfix-AmazonS3-7347c3a.json b/.changes/next-release/bugfix-AmazonS3-7347c3a.json new file mode 100644 index 000000000000..fe299d7d7def --- /dev/null +++ b/.changes/next-release/bugfix-AmazonS3-7347c3a.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Amazon S3", + "contributor": "", + "description": "Fix NPE issue thrown when using multipart S3 client to upload an object containing empty content without supplying a content length. See [#6464](https://github.com/aws/aws-sdk-java-v2/issues/6464)" +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java index de20ce7e6331..aed75193c0ea 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java @@ -89,6 +89,7 @@ import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.IoUtils; import software.amazon.awssdk.utils.Md5Utils; +import software.amazon.awssdk.utils.StringInputStream; @Timeout(value = 60, unit = SECONDS) public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTestBase { @@ -102,6 +103,7 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest private static ExecutorService executorService = Executors.newFixedThreadPool(5); private static byte[] bytes; private static byte[] expectedChecksum; + private static byte[] expectedChecksumForEmptyString; @BeforeAll public static void setup() throws Exception { @@ -111,6 +113,7 @@ public static void setup() throws Exception { testFile = new RandomTempFile(OBJ_SIZE); bytes = Files.readAllBytes(testFile.toPath()); expectedChecksum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath())); + expectedChecksumForEmptyString = ChecksumUtils.computeCheckSum(new StringInputStream("")); mpuS3Client = S3AsyncClient .builder() .region(DEFAULT_REGION) @@ -137,8 +140,14 @@ public static Stream asyncRequestBodies() { executorService)), Arguments.of("inputStream_unknownLength", AsyncRequestBody.fromInputStream(new ByteArrayInputStream(bytes), null, - executorService)) - ); + executorService))); + } + + public static Stream emptyAsyncRequestBodies() { + return Stream.of(Arguments.of("knownLength", AsyncRequestBody.fromString("")), + Arguments.of("unknownLength", + AsyncRequestBody.fromInputStream(new StringInputStream(""), null, + executorService))); } @BeforeEach @@ -181,6 +190,17 @@ void putObject_variousRequestBody_objectSentCorrectly(String description, AsyncR assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedChecksum); } + @ParameterizedTest + @MethodSource("emptyAsyncRequestBodies") + void putObject_variousEmptyRequestBody_objectSentCorrectly(String description, AsyncRequestBody body) throws Exception { + mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join(); + + ResponseInputStream objContent = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + assertThat(objContent.response().contentLength()).isEqualTo(0); + assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedChecksumForEmptyString); + } + @ParameterizedTest @MethodSource("asyncRequestBodies") diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java index cab480a540cb..25e4b363e47c 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java @@ -92,11 +92,11 @@ public CompletableFuture uploadObject(PutObjectRequest putObj return returnFuture; } - private class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber { + final class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber { /** * Indicates whether this is the first async request body or not. */ - private final AtomicBoolean isFirstAsyncRequestBody = new AtomicBoolean(true); + private final AtomicBoolean firstAsyncRequestBodyReceived = new AtomicBoolean(false); /** * Indicates whether CreateMultipartUpload has been initiated or not @@ -163,9 +163,17 @@ public void onSubscribe(Subscription s) { @Override public void onNext(CloseableAsyncRequestBody asyncRequestBody) { + if (asyncRequestBody == null) { + NullPointerException exception = new NullPointerException("asyncRequestBody passed to onNext MUST NOT be null."); + multipartUploadHelper.failRequestsElegantly(futures, + exception, uploadId, returnFuture, putObjectRequest); + return; + } + if (isDone) { return; } + int currentPartNum = partNumber.incrementAndGet(); log.trace(() -> "Received asyncRequestBody " + asyncRequestBody.contentLength()); asyncRequestBodyInFlight.incrementAndGet(); @@ -178,7 +186,7 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) { return; } - if (isFirstAsyncRequestBody.compareAndSet(true, false)) { + if (firstAsyncRequestBodyReceived.compareAndSet(false, true)) { log.trace(() -> "Received first async request body"); // If this is the first AsyncRequestBody received, request another one because we don't know if there is more firstRequestBody = asyncRequestBody; @@ -276,10 +284,13 @@ public void onError(Throwable t) { @Override public void onComplete() { log.debug(() -> "Received onComplete()"); - // If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload + // If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload, and if no + // async request body has been received, it's an empty stream if (createMultipartUploadInitiated.get() == false) { log.debug(() -> "Starting the upload as a single object upload request"); - multipartUploadHelper.uploadInOneChunk(putObjectRequest, firstRequestBody, returnFuture); + AsyncRequestBody entireRequestBody = firstAsyncRequestBodyReceived.get() ? firstRequestBody : + AsyncRequestBody.empty(); + multipartUploadHelper.uploadInOneChunk(putObjectRequest, entireRequestBody, returnFuture); } else { isDone = true; completeMultipartUploadIfFinish(asyncRequestBodyInFlight.get()); diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelperTest.java index 8a4d127c2113..39e60f909b2d 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelperTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelperTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulCompleteMultipartCall; import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulCreateMultipartCall; +import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulPutObjectCall; import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulUploadPartCalls; import java.io.FileInputStream; @@ -55,6 +56,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.utils.StringInputStream; public class UploadWithUnknownContentLengthHelperTest { private static final String BUCKET = "bucket"; @@ -116,6 +118,31 @@ void uploadObject_withMissingContentLength_shouldFailRequest() { verifyFailureWithMessage(future, "Content length is missing on the AsyncRequestBody for part number"); } + @Test + void uploadObject_emptyBody_shouldSucceed() { + stubSuccessfulPutObjectCall(s3AsyncClient); + + BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null); + CompletableFuture future = helper.uploadObject(createPutObjectRequest(), body); + body.writeInputStream(new StringInputStream("")); + future.join(); + + ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + ArgumentCaptor requestBodyArgumentCaptor = ArgumentCaptor.forClass(AsyncRequestBody.class); + verify(s3AsyncClient, times(1)).putObject(requestArgumentCaptor.capture(), + requestBodyArgumentCaptor.capture()); + + List actualRequests = requestArgumentCaptor.getAllValues(); + List actualRequestBodies = requestBodyArgumentCaptor.getAllValues(); + assertThat(actualRequestBodies).hasSize(1); + assertThat(actualRequests).hasSize(1); + + PutObjectRequest putObjectRequest = actualRequests.get(0); + assertThat(putObjectRequest.bucket()).isEqualTo(BUCKET); + assertThat(putObjectRequest.key()).isEqualTo(KEY); + assertThat(actualRequestBodies.get(0).contentLength()).hasValue(0L); + } + @Test void uploadObject_withPartSizeExceedingLimit_shouldFailRequest() { CloseableAsyncRequestBody asyncRequestBody = createMockAsyncRequestBody(PART_SIZE + 1); @@ -123,6 +150,30 @@ void uploadObject_withPartSizeExceedingLimit_shouldFailRequest() { verifyFailureWithMessage(future, "Content length must not be greater than part size"); } + @Test + void uploadObject_nullAsyncRequestBody_shouldFailRequest() { + CloseableAsyncRequestBody asyncRequestBody = createMockAsyncRequestBody(PART_SIZE); + SdkPublisher mockPublisher = mock(SdkPublisher.class); + when(asyncRequestBody.splitCloseable(any(Consumer.class))).thenReturn(mockPublisher); + + ArgumentCaptor> subscriberCaptor = ArgumentCaptor.forClass(Subscriber.class); + CompletableFuture future = helper.uploadObject(createPutObjectRequest(), asyncRequestBody); + + verify(mockPublisher).subscribe(subscriberCaptor.capture()); + Subscriber subscriber = subscriberCaptor.getValue(); + + Subscription subscription = mock(Subscription.class); + subscriber.onSubscribe(subscription); + subscriber.onNext(null); + + assertThat(future).isCompletedExceptionally(); + future.exceptionally(throwable -> { + assertThat(throwable.getCause()).isInstanceOf(NullPointerException.class); + assertThat(throwable.getCause().getMessage()).contains("MUST NOT be null"); + return null; + }).join(); + } + private PutObjectRequest createPutObjectRequest() { return PutObjectRequest.builder() .bucket(BUCKET) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/utils/MultipartUploadTestUtils.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/utils/MultipartUploadTestUtils.java index 9a97bf70c1e8..6553b1bfb9d5 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/utils/MultipartUploadTestUtils.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/utils/MultipartUploadTestUtils.java @@ -29,6 +29,8 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.multipart.S3ResumeToken; @@ -69,6 +71,23 @@ public static void stubSuccessfulCompleteMultipartCall(String bucket, String key .thenReturn(completeMultipartUploadFuture); } + public static void stubSuccessfulPutObjectCall(S3AsyncClient s3AsyncClient) { + when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + .thenAnswer(new Answer>() { + + @Override + public CompletableFuture answer(InvocationOnMock invocationOnMock) { + AsyncRequestBody asyncRequestBody = invocationOnMock.getArgument(1); + // Draining the request body + asyncRequestBody.subscribe(b -> { + }); + + return CompletableFuture.completedFuture(PutObjectResponse.builder() + .build()); + } + }); + } + public static void stubSuccessfulUploadPartCalls(S3AsyncClient s3AsyncClient) { when(s3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class))) .thenAnswer(new Answer>() {