Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3TransferManager: The TransferListener.transferComplete() is not invoked when AsyncRequestBody.fromFile() is used #3839

Closed
artembilan opened this issue Mar 15, 2023 · 4 comments
Labels
bug This issue is a bug. p2 This is a standard priority issue transfer-manager

Comments

@artembilan
Copy link

Describe the bug

The FileAsyncRequestBody does not call subscriber.onComplete() because it is cancelled from the NettyRequestExecutor.StreamedRequest via this code:

                        if (!shouldContinuePublishing()) {
                            done = true;
                            subscription.cancel();
                            subscriber.onComplete();
                        }

Expected Behavior

The FileAsyncRequestBody is properly completed and therefore a TransferListener.transferComplete() is signaled.

Current Behavior

The TransferListener.transferComplete() is not called when we upload the file.

Works for plain byte[] though.

Reproduction Steps

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
import software.amazon.awssdk.transfer.s3.progress.TransferListener;

import static org.assertj.core.api.Assertions.assertThat;

public class S3TransferFileWithProgressTests {

	static final LocalStackContainer LOCAL_STACK_CONTAINER =
			new LocalStackContainer(
					DockerImageName.parse("localstack/localstack:1.4.0"))
					.withServices(LocalStackContainer.Service.S3);

	@TempDir
	static Path temporaryFolder;

	@BeforeAll
	static void startContainer() {
		LOCAL_STACK_CONTAINER.start();
	}

	@Test
	void theTransferListenerIsUpdatedWhenFileHasBeenUploaded() throws IOException, InterruptedException {
		S3AsyncClient s3AsyncClient =
				S3AsyncClient.builder()
						.region(Region.of(LOCAL_STACK_CONTAINER.getRegion()))
						.credentialsProvider(StaticCredentialsProvider.create(
								AwsBasicCredentials.create(LOCAL_STACK_CONTAINER.getAccessKey(),
										LOCAL_STACK_CONTAINER.getSecretKey())))
						.endpointOverride(LOCAL_STACK_CONTAINER.getEndpointOverride(LocalStackContainer.Service.S3))
						.build();

		S3TransferManager s3TransferManager = S3TransferManager.builder().s3Client(s3AsyncClient).build();

		File file = new File(temporaryFolder.toFile(), "test.txt");
		file.createNewFile();
		byte[] testData = "test data".getBytes();
		try (OutputStream out = new FileOutputStream(file)) {
			out.write(testData);
		}

		CountDownLatch transferInitiated = new CountDownLatch(1);
		CountDownLatch bytesTransferred = new CountDownLatch(1);
		CountDownLatch transferComplete = new CountDownLatch(1);

		TransferListener transferListener = new TransferListener() {
			@Override
			public void transferInitiated(Context.TransferInitiated context) {
				transferInitiated.countDown();
			}

			@Override
			public void bytesTransferred(Context.BytesTransferred context) {
				bytesTransferred.countDown();
			}

			@Override
			public void transferComplete(Context.TransferComplete context) {
				transferComplete.countDown();
			}

		};

		String testBucket = "test-bucket";

		s3AsyncClient.createBucket(request -> request.bucket(testBucket)).join();

		UploadRequest.Builder uploadRequest =
				UploadRequest.builder()
						.putObjectRequest(request -> request.bucket(testBucket).key(file.getName()))
						.requestBody(AsyncRequestBody.fromFile(file))
//						.requestBody(AsyncRequestBody.fromBytes(testData))
						.addTransferListener(transferListener)
						.addTransferListener(LoggingTransferListener.create());

		s3TransferManager.upload(uploadRequest.build()).completionFuture().join();

		assertThat(transferInitiated.await(10, TimeUnit.SECONDS)).isTrue();
		assertThat(bytesTransferred.await(10, TimeUnit.SECONDS)).isTrue();
		assertThat(transferComplete.await(10, TimeUnit.SECONDS)).isTrue();

		ResponseBytes<GetObjectResponse> responseBytes =
				s3AsyncClient.getObject(request -> request.bucket(testBucket).key(file.getName()),
								AsyncResponseTransformer.toBytes())
						.join();

		assertThat(responseBytes.asByteArray()).isEqualTo(testData);
	}

}

Uncomment AsyncRequestBody.fromBytes() line and you'll see the test passing.

Possible Solution

No response

Additional Information/Context

No response

AWS Java SDK version used

2.20.22

JDK version used

openjdk version "17.0.5" 2022-10-18

Operating System and version

Windows 10

@artembilan artembilan added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Mar 15, 2023
artembilan added a commit to spring-projects/spring-integration-aws that referenced this issue Mar 16, 2023
Additional change is made to workaround AWS SDK bug: aws/aws-sdk-java-v2#3839
@debora-ito debora-ito added needs-review This issue or PR needs review from the team. and removed needs-triage This issue or PR still needs to be triaged. labels Mar 20, 2023
@debora-ito debora-ito self-assigned this Mar 20, 2023
@debora-ito
Copy link
Member

@artembilan thank you for reporting this, bug acknowledged.

@debora-ito debora-ito removed the needs-review This issue or PR needs review from the team. label Mar 21, 2023
@debora-ito debora-ito removed their assignment Mar 21, 2023
@debora-ito debora-ito added transfer-manager p2 This is a standard priority issue labels Mar 21, 2023
@zoewangg
Copy link
Contributor

This is fixed in #3839, could you try with the latest version?

@zoewangg zoewangg added the closing-soon This issue will close in 4 days unless further comments are made. label Feb 24, 2024
artembilan added a commit to spring-projects/spring-integration-aws that referenced this issue Feb 26, 2024
@artembilan
Copy link
Author

Confirmed as working now.

You can close issue respectively.

Thanks

@github-actions github-actions bot removed the closing-soon This issue will close in 4 days unless further comments are made. label Feb 26, 2024
Copy link

This issue is now closed. Comments on closed issues are hard for our team to see.
If you need more assistance, please open a new issue that references this one.

Griffin1989106 added a commit to Griffin1989106/SpringWithAWS that referenced this issue Jul 22, 2024
Additional change is made to workaround AWS SDK bug: aws/aws-sdk-java-v2#3839
Griffin1989106 added a commit to Griffin1989106/SpringWithAWS that referenced this issue Jul 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. p2 This is a standard priority issue transfer-manager
Projects
None yet
Development

No branches or pull requests

3 participants