Skip to content

Commit

Permalink
correctly set total parts in MultipartDownloadResumeContext. Add LogC…
Browse files Browse the repository at this point in the history
…aptor assertion to integ test.
  • Loading branch information
L-Applin committed Jun 6, 2024
1 parent 7098e7a commit 84572a5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import java.io.File;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.Waiter;
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.testutils.LogCaptor;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
import software.amazon.awssdk.transfer.s3.model.FileDownload;
Expand All @@ -44,12 +48,10 @@ public class S3TransferManagerMultipartDownloadPauseResumeIntegrationTest extend

@BeforeAll
public static void setup() throws Exception {
System.out.println("CREATING BUCKET");
createBucket(BUCKET);
sourceFile = new RandomTempFile(OBJ_SIZE);

// use async client for multipart upload (with default part size)
System.out.println("UPLOADING TEST OBJECT");
s3Async.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(KEY)
Expand Down Expand Up @@ -90,9 +92,19 @@ void pauseAndResume_whenAlreadyComplete_shouldHandleGracefully() {
FileDownload download = tmJava.downloadFile(request);
download.completionFuture().join();
ResumableFileDownload resume = download.pause();
FileDownload resumedDownload = tmJava.resumeDownloadFile(resume);
assertThat(resumedDownload.completionFuture()).isCompleted();
assertThat(path.toFile()).hasSameBinaryContentAs(sourceFile);
try (LogCaptor logCaptor = LogCaptor.create(Level.DEBUG)) {
FileDownload resumedDownload = tmJava.resumeDownloadFile(resume);
assertThat(resumedDownload.completionFuture()).isCompleted();
assertThat(path.toFile()).hasSameBinaryContentAs(sourceFile);

List<LogEvent> logEvents = logCaptor.loggedEvents();
assertThat(logEvents).noneMatch(
event -> event.getMessage().getFormattedMessage().contains("Sending downloadFileRequest"));
LogEvent firstLog = logEvents.get(0);
assertThat(firstLog.getMessage().getFormattedMessage())
.contains("The multipart download associated to the provided ResumableFileDownload is already completed, "
+ "nothing to resume");
}
}

private void waitUntilAmountTransferred(FileDownload download, long amountTransferred) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private CopyObjectRequest attachSdkAttribute(CopyObjectRequest copyObjectRequest
.build();
}

private GetObjectRequest attachSdkAttributes(GetObjectRequest request,
private GetObjectRequest attachSdkAttribute(GetObjectRequest request,
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
request.overrideConfiguration()
Expand Down Expand Up @@ -373,7 +373,7 @@ public final <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downl
public final FileDownload downloadFile(DownloadFileRequest downloadRequest) {
Validate.paramNotNull(downloadRequest, "downloadFileRequest");

GetObjectRequest getObjectRequestWithAttributes = attachSdkAttributes(
GetObjectRequest getObjectRequestWithAttributes = attachSdkAttribute(
downloadRequest.getObjectRequest(),
b -> b.putExecutionAttribute(MULTIPART_DOWNLOAD_RESUME_CONTEXT, new MultipartDownloadResumeContext()));
DownloadFileRequest downloadFileRequestWithAttributes =
Expand Down Expand Up @@ -429,6 +429,8 @@ public final FileDownload resumeDownloadFile(ResumableFileDownload resumableFile
Optional<MultipartDownloadResumeContext> optCtx =
multipartDownloadResumeContext(resumableFileDownload.downloadFileRequest().getObjectRequest());
if (optCtx.map(MultipartDownloadResumeContext::isComplete).orElse(false)) {
log.debug(() -> "The multipart download associated to the provided ResumableFileDownload is already completed, "
+ "nothing to resume");
return completedDownload(resumableFileDownload, optCtx.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class MultipartDownloadResumeContext {
private Long bytesToLastCompletedParts;

/**
* The total amount of part of the multipart download;
* The total number of parts of the multipart download.
*/
private Integer totalParts;

Expand Down Expand Up @@ -78,6 +78,14 @@ public void addToBytesToLastCompletedParts(long bytes) {
bytesToLastCompletedParts += bytes;
}

public void totalParts(int totalParts) {
this.totalParts = totalParts;
}

public Integer totalParts() {
return totalParts;
}

public GetObjectResponse response() {
return this.response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ private void requestMoreIfNeeded(GetObjectResponse response) {
Integer partCount = response.partsCount();
if (partCount != null && totalParts == null) {
log.trace(() -> String.format("total parts: %d", partCount));
MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest)
.ifPresent(ctx -> ctx.totalParts(partCount));
totalParts = partCount;
}
synchronized (lock) {
Expand Down

0 comments on commit 84572a5

Please sign in to comment.