Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitting transformer #4826

Merged
merged 20 commits into from
Feb 12, 2024
Merged

Conversation

L-Applin
Copy link
Contributor

@L-Applin L-Applin commented Jan 11, 2024

Implementation of the Splitting transformer.

Split an AsyncResponseTranformer into multiple ones, retreived via a Publisher of AsyncResponseTranformer. When requested, the publisher will publish a new AsyncResponseTranformer to its subscriber, but the data received in this individual AsyncResponseTranformer will be buffered and then send to the upstream AsyncResponseTranformer (the one that was split).

Also includes a draft of the subscriber that will use the individually published AsyncResponseTransformer, but is still in a draft state.

The intent is to use the SplittingTransformer to perform multipart get request for s3 objects. The SplittingTranformer would publish an AsyncResponseTranformer instance when requested by the subscriber, which would perform the call to s3 using that AsyncResponseTranformer. By using the split() method on an instance of an existing AsyncResponseTranformer to create a SplittingTranformer, the behaviour of the splitting tranformer can be fine tuned to the different implementations of the AsyncResponseTranformer interface if required.

Diagram

Splitting Transformer

@L-Applin L-Applin marked this pull request as ready for review January 12, 2024 16:14
@L-Applin L-Applin requested a review from a team as a code owner January 12, 2024 16:14

// [WIP]
// Still work in progress, currently only used to help manual testing, please ignore
public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this class for now, this is only a stub that was used for manually testing with s3.

import software.amazon.awssdk.services.s3.model.GetObjectResponse;

// WIP - please ignore for now, only used in manually testing
class MultipartDownloadIntegrationTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore for now, only used for manually testing with s3 objects.

Comment on lines +123 to +124
default SplitAsyncResponseTransformer<ResponseT, ResultT> split(long bufferSize) {
CompletableFuture<ResultT> future = new CompletableFuture<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the split() method return a SplittingTransformer instead of a SplitAsyncResponseTransformer? Can't the splitting transformer create the future and make it available to the user in the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to avoid returning the concrete type SplittingTransformer and instead return the interface type SdkPublisher<AsyncResponseTransformer<T>>. If we return a SplittingTransformer here, we would need to make it @SdkPublicAPI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reasonSplitAsyncResponseTransformer doesn't implement AsyncResponseTransformer interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SplitAsyncResponseTransformer is just a holder for the publisher and a completable future, the class implementing AsyncResponseTransformer is SplittingTransformer.IndividualTransformer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it implement AsyncResponseTransformer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I don't think so. The usage would be:

SplitAsyncResponseTransformer<GetObjectResponse, GetObjectResponse> split = transformer.split(...);
split.publisher().subscribe(downloaderSubscriber);
CompletableFuture<GetObjectResponse> future = split.preparedFuture();
// do whatever we want with the future
GetObjectResponse response = future.join();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just seems a bit odd that the static factory method returns a class that doesn't implement the same interface. Can we make it implement it? We did the same for toPublisher method

https://github.com/aws/aws-sdk-java-v2/blob/master/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java#L235

Copy link
Contributor Author

@L-Applin L-Applin Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.split() is not a static method though, it is an instance method called on a specific instance of an AsyncResponseTransformer just like the split() method on AsyncRequestBody. We could instead return just the SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> like AsyncRequestBody.split. I would need to find another way to deal with the future that needs to be returned from the service call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll need to think a bit more on this. Let's keep this conversation open for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a TODO to revisit the naming? It's a bit odd that this class is suffixed with AsyncResponseTransformer but doesn't implement it

Copy link
Contributor

@zoewangg zoewangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some initial comments. Still going through the PR

- optional maxElement for SplittingTransformer
- white box verification tests for DelegatingBufferingSubscriber and IndividualPartSubscriber
- make DelegatingBufferingSubscriber.onNext iterative
Copy link
Contributor

@zoewangg zoewangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at DelegatingBufferingSubscriber now

Comment on lines +123 to +124
default SplitAsyncResponseTransformer<ResponseT, ResultT> split(long bufferSize) {
CompletableFuture<ResultT> future = new CompletableFuture<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reasonSplitAsyncResponseTransformer doesn't implement AsyncResponseTransformer interface?

long bufferSize, CompletableFuture<ResultT> returnFuture) {
this.upstreamResponseTransformer = Validate.paramNotNull(upstreamResponseTransformer, "asyncRequestBody");
this.returnFuture = Validate.paramNotNull(returnFuture, "returnFuture");
this.maximumBufferSize = NumericUtils.saturatedCast(bufferSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to allocate a ByteBuffer of that size in the case we receive a bigger ByteBuffer from the publisher of the individual request body. See DelegatingBufferingSubscriber#L89

@Override
public void onNext(ByteBuffer byteBuffer) {
while (bufferOverflows(byteBuffer)) {
flushStorageToDelegate();
Copy link
Contributor

@zoewangg zoewangg Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to track downstream subscription; if there's no demand from downstream, we shouldn't call subscriber.onNext

Example: https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java

It's going to be super complex and hard to maintain if we add that logic... Let me see if we can reuse some of the logic in some way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, let me know if you think of something better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, can't think of another way, we just need to implement it :(

super.onSubscribe(subscription);
this.subscription = subscription;
publisherToStorage.subscribe(storage);
subscription.request(maximumBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number is actually number of ByteBuffer instead of number of bytes.

- SplittingTransformer builder
- DelegatingBufferingSubscriber protected API
- remove spec comments
- refactor emit method
@Override
public void onNext(ByteBuffer byteBuffer) {
while (bufferOverflows(byteBuffer)) {
flushStorageToDelegate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, can't think of another way, we just need to implement it :(


@Override
public void onNext(ByteBuffer byteBuffer) {
while (bufferOverflows(byteBuffer)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should trigger a flush as long as we have data instead of buffer is full. See how FlatteningSubscriber is implemented https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java#L81

Move common logic for FlatteningSubscriber and DelegatingBufferingSubscriber to new class AbstractFlatteningSubscriber
@L-Applin
Copy link
Contributor Author

To answer most concerns about DelegatingBufferingSubscriber, it was modified to reuse the logic in FlatteningSubscriber for its implementation

Copy link

sonarcloud bot commented Feb 12, 2024

Quality Gate Failed Quality Gate failed

Failed conditions
79.0% Coverage on New Code (required ≥ 80%)
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarCloud

idea Catch issues before they fail your Quality Gate with our IDE extension SonarLint SonarLint

@L-Applin L-Applin merged commit 8c18e39 into feature/master/s3mpu Feb 12, 2024
15 of 16 checks passed
* @param maximumBufferSize the amount of data buffered and the size of the chunk of data
* @return an instance of this builder
*/
public Builder<ResponseT, ResultT> maximumBufferSize(long maximumBufferSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be Long?

*/
@Override
protected boolean onNextNeeded() {
return super.onNextNeeded() && storage.peek().isPresent() && storage.peek().get().type() == ON_NEXT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing super.onNextNeeded(), suggesting using template methods:

In the parent class:

    private boolean onNextNeeded() {
        return downstreamDemand.get() > 0 && additionalOnNextNeededCheck();
    }

/**
  Can be overridden by subclasses to provide additional checks
*/
    protected boolean additionalOnNextNeededCheck() {
        return true;
    }

*/
private final StoringSubscriber<ByteBuffer> storage = new StoringSubscriber<>(Integer.MAX_VALUE);

public DelegatingBufferingSubscriber(long maximumBuffer, Subscriber<? super ByteBuffer> subscriber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a protected API, we should use builder


public DelegatingBufferingSubscriber(long maximumBuffer, Subscriber<? super ByteBuffer> subscriber) {
super(subscriber);
this.maximumBuffer = maximumBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maximumBufferInBytes


@Override
public void onNext(ByteBuffer item) {
if (currentlyBuffered.get() + item.remaining() > maximumBuffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should trigger a flush as long as we have data instead of buffer is full.

* Returns true if we need to call onNext downstream. If this is executed outside the handling-state-update condition, the
* result is subject to change.
*/
protected boolean onNextNeeded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my previous comment, suggesting template methods to allow subclasses to add additional checks. Can we make those methods that are intended to be overridden by subclasses package private?

* result is subject to change.
*/
protected boolean onCompleteNeeded() {
return onCompleteCalledByUpstream && !terminalCallMadeDownstream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, suggesting template methods.

    private boolean onCompleteNeeded() {
        return onCompleteCalledByUpstream && !terminalCallMadeDownstream && additionalOnCompleteNeededCheck() ;
   } 

   boolean additionalOnCompleteNeededCheck() {
     return true;
 } 

*
* @param item the value with which onNext was called.
*/
protected abstract void doWithItem(T item);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a better naming for this one, can we make this package private so that we can change it in the future?

import software.amazon.awssdk.utils.Logger;

@SdkInternalApi
public abstract class AbstractFlatteningSubscriber<T, U> extends DelegatingSubscriber<T, U> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a protected API. nit: this no longer does flattening, maybe BaseSubscriberConverter/Adaptor?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants