Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Progress Listener Invocation methods to Asynchronous and Synchronous code paths #5044

Open
wants to merge 70 commits into
base: feature/anirudkr-progress-listener
Choose a base branch
from

Conversation

anirudh9391
Copy link
Contributor

Asynchronous request Path to include Progress Listener invocations to track transactions

Motivation and Context

This is a parity feature gap between 1.x and 2.x

Modifications

Added callbacks :

  • requestPrepared
  • requestBytesSent
  • responseHeaderReceived
  • responseBytesReceived

in asynchronous code path.

Testing

Unit Tests

Screenshots (if appropriate)

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • [ x] New feature (non-breaking change which adds functionality)

Checklist

  • [x ] I have read the CONTRIBUTING document
  • [x ] Local run of mvn install succeeds
  • [ x] My code follows the code style of this project
  • [x ] My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • [ x] I have added tests to cover my changes
  • [x ] All new and existing tests passed
  • I have added a changelog entry. Adding a new entry must be accomplished by running the scripts/new-change script and following the instructions. Commit the new file created by the script in .changes/next-release with your changes.
  • My change is to implement 1.11 parity feature and I have updated LaunchChangelog

License

  • [ x] I confirm that this pull request can be released under the Apache 2 license

…ure context objects. Added progress listener method to RequestOverrideConfiguration
@anirudh9391 anirudh9391 changed the title Add Progress Listener Invocation methods to Asynchronous code paths Add Progress Listener Invocation methods to Asynchronous and Synchronous code paths Jul 16, 2024
*/
@SdkInternalApi
public final class BytesReadTrackingPublisher implements Publisher<ByteBuffer> {
public final class BytesReadTrackingPublisher implements SdkHttpContentPublisher {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we changed this class to implement SdkHttpContentPublisher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is to enable the ByteReadTrackingPublisher to track the AsyncBody publisher of type SdkHttpContentPublisher :

requestProvider = ProgressListenerUtils.wrapRequestProviderWithByteTrackingIfProgressListenerAttached(requestProvider,
. The same class, ByteReadTrackingPublisher is used to track requests and response

Comment on lines 59 to 62
ContentStreamProvider.fromInputStream(new BytesReadTrackingInputStream(
AbortableInputStream.create(contentStreamProvider.newStream()),
new AtomicLong(0L),
new UploadProgressUpdaterInvocation(progressUpdater)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit hard to read, let's create variables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the existing implementation passes RESPONSE_BYTES_READ to BytesReadTrackingInputStream and now we are passing 0L, is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RESPONSE_BYTES_READ is used to account for "The running count of bytes in the response body that have been read by the client." Here, we are using it for an upload context but the same BytesReadTrackingInputStream is made use of. Hence, the 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will break existing code because we will later read context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ) to get the counter

if (ProgressListenerUtils.progressListenerAttached(context.originalRequest())) {
Long requestContentLength =
(context.requestProvider() != null && context.requestProvider().contentLength().isPresent()) ?
context.requestProvider().contentLength().get() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems requestProvider() returns AsyncRequestBody which is for async code path, what about sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is only to track the bytes published in the Async request code path; I still have not found a way to track the sync req. Wil put that out in the next pr


public SdkExchangeProgress requestBodyProgress() {
return requestBodyProgress;
@SdkPublicApi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be internal API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@@ -68,6 +72,10 @@ public int read(byte[] b) throws IOException {
private void updateBytesRead(long read) {
if (read > 0) {
bytesRead.addAndGet(read);

if (progressUpdaterInvoker != null) {
progressUpdaterInvoker.incrementBytesTransferred(bytesRead.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be incrementBytesTransferred(read)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

* ProgressUpdater exposes methods that invokes listener methods to update and store request progress state
*/
@SdkInternalApi
public class DeafultProgressUpdater implements ProgressUpdater {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo, DefaultProgressUpdater

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines 26 to 32
public SdkExchangeProgress requestBodyProgress() {
return null;
}

public SdkExchangeProgress responseBodyProgress() {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this

import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater;

@SdkInternalApi
public class ResponseProgressUpdaterInvocation implements ProgressUpdaterInvoker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResponseProgressUpdaterInvoker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed, good suggestion

@@ -189,8 +190,7 @@ default void responseBytesReceived(Context.ResponseBytesReceived context) {
/**
* For Expect: 100-continue embedded requests, the service returning anything other than 100 continue
* indicates a request failure. This method captures the error in the payload
* After this, either executionFailure or requestHeaderSent will always be invoked depending on
* whether the error type is retryable or not
* After this it will either be an executionFailure or a request retry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfinished sentence

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant it as a finishing sentence, modified it to be clearer.

public class BytesSentTrackingPublisherTest {

@Test
public void test_updatesBytesSent() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_ prefix is really not needed. Let's follow our unit tests naming pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

Comment on lines 59 to 62
ContentStreamProvider.fromInputStream(new BytesReadTrackingInputStream(
AbortableInputStream.create(contentStreamProvider.newStream()),
new AtomicLong(0L),
new UploadProgressUpdaterInvocation(progressUpdater)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will break existing code because we will later read context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ) to get the counter


public static SdkHttpContentPublisher wrapRequestProviderWithByteTrackingIfProgressListenerAttached(
SdkHttpContentPublisher requestProvider, ProgressUpdater progressUpdater) {
return new BytesReadTrackingPublisher(requestProvider, new AtomicLong(0L),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, new AtomicLong(0L), will break existing code path because the SDK relies on SdkInternalExecutionAttribute.RESPONSE_BYTES_READ to contain the counter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. Im not modifying the context attribute SdkInternalExecutionAttribute.RESPONSE_BYTES_READ here. While wrapping the request provider(while sending the request) with BytesReadTracking publisher im setting bytes read to 0, as we have not yet made the request at that point. This should not modify what we set on the response

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarCloud

Catch issues before they fail your Quality Gate with our IDE extension SonarLint

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants