Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Reactive Streams API for S3 protocol #198

Open
migroskub opened this issue Dec 17, 2021 · 13 comments
Open

Support Reactive Streams API for S3 protocol #198

migroskub opened this issue Dec 17, 2021 · 13 comments

Comments

@migroskub
Copy link

Hey. I would like to suggest support for Project Reactor when integrating with AWS and Spring Integration. I suggest the ability to write a Flux to S3, consuming Mono and more. Here is a good example.

@artembilan
Copy link
Member

<awssdk.version>2.10.27</awssdk.version>

We are aware about Reactive Streams API in the AWS SDK v2. Only the problem that we have to migrate into that version first and then we can see what we could do with the Reactive Streams support: #155.

I'm not closing this issue in favor of that, but rephrase it for S3 specifics, so we can come back to this when we do migration to AWS SDK v2.

@artembilan artembilan changed the title Project Reactor Support Support Reactive Streams API for S3 protocol Dec 17, 2021
@migroskub
Copy link
Author

Ok. I didn't know about the SDK v2 capabilities. Sounds great. Can you tell about the timeline expectations for the SDK v2 migrations?

@artembilan
Copy link
Member

Read the issue I've pointed in my answer.
And follow links to other related issues.
I cannot judge from here until we got some answers in the Spring Cloud AWS project.

@lynch19
Copy link

lynch19 commented Dec 18, 2021

+1

@almogtavor
Copy link

Are there any classes that can still work with internal implementation that is based on V2? Or maybe the case is that all of the existing classes doesn't support Reactor?

@artembilan
Copy link
Member

Technically even existing AWS SKD v1 is compatible with Reactive Stream - its has an async variants for AWS services, which we really use in this Spring Integration for AWS project. So, you are good to develop Reactive solution even right now.

@almogtavor
Copy link

Can you refer to the classes that are relevant for the asynchronous operations?

@artembilan
Copy link
Member

@jifeiiii
Copy link

@artembilan So the outbound channel supports reactor or not? The docs (that doesn't appear in the website) doesn't mention this. Why?

@jifeiiii
Copy link

I'd really like to see an example of usage of the OutboundGateway for S3 in the JavaDSL

@artembilan
Copy link
Member

It would be great to determine in your opinion what is compatibility with Reactor or not.
As I said before: the AWS API in most cases is async, so it can be adapted to Reactor Flux and Mono.
There is no such a docs just because these component does not expose reactive API directly.
When they will, it would be reflected in docs.
However I treat any async API as compatible with reactive streams.
More over even blocking one could be shifted to the specific reactive scheduler do not block the whole stream, but wait for the result asynchronously.

The S3MessageHandler can be configured as a bean and used in the .handle() of Java DSL.
The produceReply option of it has to be set to true to make it working as a gateway.
It does return a com.amazonaws.services.s3.transfer.Transfer as a result of the publishing to S3.
You can adapter that object to reactive streams (Reactor Mono) via its public void addProgressListener(ProgressListener listener); hook.
But this already out of Spring Integration scope.

The docs is mentioning how to use an arbitrary channel adapter in Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters

@jifeiiii
Copy link

jifeiiii commented Mar 28, 2022

@artembilan After looking at the code I see 2 things that makes me wonder if its really async and non blocking (e.g. safe to use with reactor).

  1. There is no use of S3AsyncClient
  2. There is usage of InputStream when passing byte[] to the upload command. Source - https://stackoverflow.com/questions/67476133/upload-a-inputstream-to-aws-s3-asynchronously-non-blocking-using-aws-sdk-for-j

@artembilan
Copy link
Member

@jifeiiii ,

you probably didn't get the main point of this issue: we cannot use AWS SDK v2 yet. The Client you mention is missed in the version of AWS SKD we currently support in this project: https://www.baeldung.com/java-aws-s3-reactive.

See JavaDocs of the current API we use in the S3MessageHandler:


    /**
     * <p>
     * Schedules a new transfer to upload data to Amazon S3. This method is
     * non-blocking and returns immediately (i.e. before the upload has
     * finished).
     * </p>
     * <p>
     * Use the returned <code>Upload</code> object to query the progress of the
     * transfer, add listeners for progress events, and wait for the upload to
     * complete.
     * </p>
     * <p>
     * If resources are available, the upload will begin immediately.
     * Otherwise, the upload is scheduled and started as soon as
     * resources become available.
     * </p>
     * <p>
     * If you are uploading <a href="http://aws.amazon.com/kms/">Amazon Web Services
     * KMS</a>-encrypted objects, you need to specify the correct region of the
     * bucket on your client and configure Amazon Web Services Signature Version 4 for added
     * security. For more information on how to do this, see
     * http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#
     * specify-signature-version
     * </p>
     *
     * @param putObjectRequest
     *            The request containing all the parameters for the upload.
     *
     * @return A new <code>Upload</code> object to use to check
     * 		   the state of the upload, listen for progress notifications,
     * 		   and otherwise manage the upload.
     *
     * @throws AmazonClientException
     *             If any errors are encountered in the client while making the
     *             request or handling the response.
     * @throws AmazonServiceException
     *             If any errors occurred in Amazon S3 while processing the
     *             request.
     */
    public Upload upload(final PutObjectRequest putObjectRequest)

That's from where I make my assumptions about this channel adapter possible usage in the reactive streams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants