Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-7347c3a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fix NPE issue thrown when using multipart S3 client to upload an object containing empty content without supplying a content length. See [#6464](https://github.com/aws/aws-sdk-java-v2/issues/6464)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.Md5Utils;
import software.amazon.awssdk.utils.StringInputStream;

@Timeout(value = 60, unit = SECONDS)
public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTestBase {
Expand All @@ -102,6 +103,7 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest
private static ExecutorService executorService = Executors.newFixedThreadPool(5);
private static byte[] bytes;
private static byte[] expectedChecksum;
private static byte[] expectedChecksumForEmptyString;

@BeforeAll
public static void setup() throws Exception {
Expand All @@ -111,6 +113,7 @@ public static void setup() throws Exception {
testFile = new RandomTempFile(OBJ_SIZE);
bytes = Files.readAllBytes(testFile.toPath());
expectedChecksum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));
expectedChecksumForEmptyString = ChecksumUtils.computeCheckSum(new StringInputStream(""));
mpuS3Client = S3AsyncClient
.builder()
.region(DEFAULT_REGION)
Expand All @@ -137,8 +140,14 @@ public static Stream<Arguments> asyncRequestBodies() {
executorService)),
Arguments.of("inputStream_unknownLength",
AsyncRequestBody.fromInputStream(new ByteArrayInputStream(bytes), null,
executorService))
);
executorService)));
}

public static Stream<Arguments> emptyAsyncRequestBodies() {
return Stream.of(Arguments.of("knownLength", AsyncRequestBody.fromString("")),
Arguments.of("unknownLength",
AsyncRequestBody.fromInputStream(new StringInputStream(""), null,
executorService)));
}

@BeforeEach
Expand Down Expand Up @@ -181,6 +190,17 @@ void putObject_variousRequestBody_objectSentCorrectly(String description, AsyncR
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedChecksum);
}

@ParameterizedTest
@MethodSource("emptyAsyncRequestBodies")
void putObject_variousEmptyRequestBody_objectSentCorrectly(String description, AsyncRequestBody body) throws Exception {
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();

ResponseInputStream<GetObjectResponse> objContent = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
ResponseTransformer.toInputStream());
assertThat(objContent.response().contentLength()).isEqualTo(0);
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedChecksumForEmptyString);
}


@ParameterizedTest
@MethodSource("asyncRequestBodies")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObj
return returnFuture;
}

private class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {
final class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {
/**
* Indicates whether this is the first async request body or not.
*/
private final AtomicBoolean isFirstAsyncRequestBody = new AtomicBoolean(true);
private final AtomicBoolean firstAsyncRequestBodyReceived = new AtomicBoolean(false);

/**
* Indicates whether CreateMultipartUpload has been initiated or not
Expand Down Expand Up @@ -163,9 +163,17 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
if (asyncRequestBody == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add isDone = true here just like with onError() ?

NullPointerException exception = new NullPointerException("asyncRequestBody passed to onNext MUST NOT be null.");
multipartUploadHelper.failRequestsElegantly(futures,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noob question, I assume there is a reason we use failRequestsElegantly here. The Reactive Streams spec requires throwing NPE when onNext() receives null. Is there a reason we complete the future exceptionally but don't throw to the caller?

exception, uploadId, returnFuture, putObjectRequest);
return;
}

if (isDone) {
return;
}

int currentPartNum = partNumber.incrementAndGet();
log.trace(() -> "Received asyncRequestBody " + asyncRequestBody.contentLength());
asyncRequestBodyInFlight.incrementAndGet();
Expand All @@ -178,7 +186,7 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
return;
}

if (isFirstAsyncRequestBody.compareAndSet(true, false)) {
if (firstAsyncRequestBodyReceived.compareAndSet(false, true)) {
log.trace(() -> "Received first async request body");
// If this is the first AsyncRequestBody received, request another one because we don't know if there is more
firstRequestBody = asyncRequestBody;
Expand Down Expand Up @@ -276,10 +284,13 @@ public void onError(Throwable t) {
@Override
public void onComplete() {
log.debug(() -> "Received onComplete()");
// If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload
// If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload, and if no
// async request body has been received, it's an empty stream
if (createMultipartUploadInitiated.get() == false) {
log.debug(() -> "Starting the upload as a single object upload request");
multipartUploadHelper.uploadInOneChunk(putObjectRequest, firstRequestBody, returnFuture);
AsyncRequestBody entireRequestBody = firstAsyncRequestBodyReceived.get() ? firstRequestBody :
AsyncRequestBody.empty();
multipartUploadHelper.uploadInOneChunk(putObjectRequest, entireRequestBody, returnFuture);
} else {
isDone = true;
completeMultipartUploadIfFinish(asyncRequestBodyInFlight.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.when;
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulCompleteMultipartCall;
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulCreateMultipartCall;
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulPutObjectCall;
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartUploadTestUtils.stubSuccessfulUploadPartCalls;

import java.io.FileInputStream;
Expand Down Expand Up @@ -55,6 +56,7 @@
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.utils.StringInputStream;

public class UploadWithUnknownContentLengthHelperTest {
private static final String BUCKET = "bucket";
Expand Down Expand Up @@ -116,13 +118,62 @@ void uploadObject_withMissingContentLength_shouldFailRequest() {
verifyFailureWithMessage(future, "Content length is missing on the AsyncRequestBody for part number");
}

@Test
void uploadObject_emptyBody_shouldSucceed() {
stubSuccessfulPutObjectCall(s3AsyncClient);

BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);
CompletableFuture<PutObjectResponse> future = helper.uploadObject(createPutObjectRequest(), body);
body.writeInputStream(new StringInputStream(""));
future.join();

ArgumentCaptor<PutObjectRequest> requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
ArgumentCaptor<AsyncRequestBody> requestBodyArgumentCaptor = ArgumentCaptor.forClass(AsyncRequestBody.class);
verify(s3AsyncClient, times(1)).putObject(requestArgumentCaptor.capture(),
requestBodyArgumentCaptor.capture());

List<PutObjectRequest> actualRequests = requestArgumentCaptor.getAllValues();
List<AsyncRequestBody> actualRequestBodies = requestBodyArgumentCaptor.getAllValues();
assertThat(actualRequestBodies).hasSize(1);
assertThat(actualRequests).hasSize(1);

PutObjectRequest putObjectRequest = actualRequests.get(0);
assertThat(putObjectRequest.bucket()).isEqualTo(BUCKET);
assertThat(putObjectRequest.key()).isEqualTo(KEY);
assertThat(actualRequestBodies.get(0).contentLength()).hasValue(0L);
}

@Test
void uploadObject_withPartSizeExceedingLimit_shouldFailRequest() {
CloseableAsyncRequestBody asyncRequestBody = createMockAsyncRequestBody(PART_SIZE + 1);
CompletableFuture<PutObjectResponse> future = setupAndTriggerUploadFailure(asyncRequestBody);
verifyFailureWithMessage(future, "Content length must not be greater than part size");
}

@Test
void uploadObject_nullAsyncRequestBody_shouldFailRequest() {
CloseableAsyncRequestBody asyncRequestBody = createMockAsyncRequestBody(PART_SIZE);
SdkPublisher<CloseableAsyncRequestBody> mockPublisher = mock(SdkPublisher.class);
when(asyncRequestBody.splitCloseable(any(Consumer.class))).thenReturn(mockPublisher);

ArgumentCaptor<Subscriber<CloseableAsyncRequestBody>> subscriberCaptor = ArgumentCaptor.forClass(Subscriber.class);
CompletableFuture<PutObjectResponse> future = helper.uploadObject(createPutObjectRequest(), asyncRequestBody);

verify(mockPublisher).subscribe(subscriberCaptor.capture());
Subscriber<CloseableAsyncRequestBody> subscriber = subscriberCaptor.getValue();

Subscription subscription = mock(Subscription.class);
subscriber.onSubscribe(subscription);
subscriber.onNext(null);

assertThat(future).isCompletedExceptionally();
future.exceptionally(throwable -> {
assertThat(throwable.getCause()).isInstanceOf(NullPointerException.class);
assertThat(throwable.getCause().getMessage()).contains("MUST NOT be null");
return null;
}).join();
}

private PutObjectRequest createPutObjectRequest() {
return PutObjectRequest.builder()
.bucket(BUCKET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.multipart.S3ResumeToken;
Expand Down Expand Up @@ -69,6 +71,23 @@ public static void stubSuccessfulCompleteMultipartCall(String bucket, String key
.thenReturn(completeMultipartUploadFuture);
}

public static void stubSuccessfulPutObjectCall(S3AsyncClient s3AsyncClient) {
when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)))
.thenAnswer(new Answer<CompletableFuture<PutObjectResponse>>() {

@Override
public CompletableFuture<PutObjectResponse> answer(InvocationOnMock invocationOnMock) {
AsyncRequestBody asyncRequestBody = invocationOnMock.getArgument(1);
// Draining the request body
asyncRequestBody.subscribe(b -> {
});

return CompletableFuture.completedFuture(PutObjectResponse.builder()
.build());
}
});
}

public static void stubSuccessfulUploadPartCalls(S3AsyncClient s3AsyncClient) {
when(s3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class)))
.thenAnswer(new Answer<CompletableFuture<UploadPartResponse>>() {
Expand Down
Loading