Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle file upload future completed exceptionally for Java-based TM #5543

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
import software.amazon.awssdk.utils.Lazy;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;

@SdkInternalApi
public final class CrtFileUpload implements FileUpload {
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
davidh44 marked this conversation as resolved.
Show resolved Hide resolved

private final Lazy<ResumableFileUpload> resumableFileUpload;
private final CompletableFuture<CompletedFileUpload> completionFuture;
private final TransferProgress progress;
Expand All @@ -57,7 +61,14 @@ public ResumableFileUpload pause() {

private ResumableFileUpload doPause() {
File sourceFile = request.source().toFile();
if (completionFuture.isDone()) {

boolean futureCompletedExceptionally = completionFuture.isCompletedExceptionally();
davidh44 marked this conversation as resolved.
Show resolved Hide resolved
if (completionFuture.isDone()
// TODO - uncomment once CRT handles future completed exceptionally to return ResumeToken
//&& !futureCompletedExceptionally
) {
log.debug(() -> "The upload future was completed. There will be no ResumeToken returned.");

Instant fileLastModified = Instant.ofEpochMilli(sourceFile.lastModified());
return ResumableFileUpload.builder()
.fileLastModified(fileLastModified)
Expand All @@ -80,15 +91,23 @@ private ResumableFileUpload doPause() {
}

completionFuture.cancel(true);
// Upload hasn't started yet, or it's a single object upload
if (token == null) {
if (futureCompletedExceptionally) {
log.debug(() -> "The upload future was completed exceptionally and the ResumeToken returned by the "
+ "S3 MetaRequest was null.");
} else {
log.debug(() -> "The upload hasn't started yet or it's a single object upload. There will be no ResumeToken "
+ "returned");
}

return ResumableFileUpload.builder()
.fileLastModified(fileLastModified)
.fileLength(sourceFile.length())
.uploadFileRequest(request)
.build();
}

log.debug(() -> "The upload was successfully paused and a ResumeToken was returned.");
davidh44 marked this conversation as resolved.
Show resolved Hide resolved
return ResumableFileUpload.builder()
.multipartUploadId(token.getUploadId())
.totalParts(token.getTotalNumParts())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.services.s3.multipart.PauseObservable;
import software.amazon.awssdk.services.s3.multipart.S3ResumeToken;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
import software.amazon.awssdk.utils.Lazy;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;

@SdkInternalApi
public final class DefaultFileUpload implements FileUpload {
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
davidh44 marked this conversation as resolved.
Show resolved Hide resolved

private final Lazy<ResumableFileUpload> resumableFileUpload;
private final CompletableFuture<CompletedFileUpload> completionFuture;
private final TransferProgress progress;
Expand Down Expand Up @@ -70,17 +74,29 @@ private ResumableFileUpload doPause() {
.fileLength(sourceFile.length())
.uploadFileRequest(request);

if (completionFuture.isDone()) {
boolean futureCompletedExceptionally = completionFuture.isCompletedExceptionally();
if (completionFuture.isDone() && !futureCompletedExceptionally) {
log.debug(() -> "The upload future was finished and was not completed exceptionally. There will be no S3ResumeToken "
+ "returned.");

return resumableFileBuilder.build();
}

S3ResumeToken token = pauseObservable.pause();

// Upload hasn't started yet, or it's a single object upload
if (token == null) {
log.debug(() -> "The upload hasn't started yet, or it's a single object upload. There will be no S3ResumeToken "
+ "returned.");
return resumableFileBuilder.build();
}

if (futureCompletedExceptionally) {
log.debug(() -> "The upload future was completed exceptionally but has been successfully paused and a S3ResumeToken "
+ "was returned.");
} else {
log.debug(() -> "The upload was successfully paused and a S3ResumeToken was returned.");
}

return resumableFileBuilder.multipartUploadId(token.uploadId())
.totalParts(token.totalNumParts())
.transferredParts(token.numPartsCompleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private ResumableRequestConverter() {
}

if (hasRemainingParts(getObjectRequest)) {
// multipart GET for the remaining parts
log.debug(() -> "The paused download was performed with part GET, now resuming download of remaining parts");
Long positionToWriteFrom =
MultipartDownloadUtils.multipartDownloadResumeContext(originalDownloadRequest.getObjectRequest())
.map(MultipartDownloadResumeContext::bytesToLastCompletedParts)
Expand All @@ -92,7 +92,7 @@ private ResumableRequestConverter() {
return Pair.of(originalDownloadRequest, responseTransformer);
}

// ranged GET for the remaining bytes.
log.debug(() -> "The paused download was performed with range GET, now resuming download of remaining bytes.");
newDownloadFileRequest = resumedDownloadFileRequest(resumableFileDownload,
originalDownloadRequest,
getObjectRequest,
Expand Down
Loading