-
Notifications
You must be signed in to change notification settings - Fork 853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add publisher wrapping when handling async checksums #4554
Add publisher wrapping when handling async checksums #4554
Conversation
@SdkInternalApi | ||
public final class ChecksumPublisher implements Publisher<ByteBuffer> { | ||
private final Publisher<ByteBuffer> checksummingPublisher; | ||
private final CompletableFuture<Void> signal = new CompletableFuture<>(); | ||
|
||
public ChecksumPublisher(Publisher<ByteBuffer> publisher, Collection<? extends Checksum> consumers) { | ||
this.checksummingPublisher = subscriber -> { | ||
publisher.subscribe(new ChecksumSubscriber(subscriber, consumers, signal)); | ||
}; | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super ByteBuffer> subscriber) { | ||
checksummingPublisher.subscribe(subscriber); | ||
} | ||
|
||
public CompletableFuture<Void> checksum() { | ||
return signal; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
@SdkInternalApi
public final class ChecksumPublisher implements Publisher<ByteBuffer> {
private final CompletableFuture<Void> signal = new CompletableFuture<>();
private final Publisher<ByteBuffer> publisher;
private final Collection<? extends Checksum> consumers;
public ChecksumPublisher(Publisher<ByteBuffer> publisher, Collection<? extends Checksum> consumers) {
this.publisher = publisher;
this.consumers = consumers;
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(new ChecksumSubscriber(subscriber, consumers, signal));
}
public CompletableFuture<Void> checksum() {
return signal;
}
}
You might also want to add validation that subscribe()
is only called once. If it gets called again (allowed for publishers), weird stuff will happen since we're reusing signal
.
byte[] buf; | ||
if (byteBuffer.hasArray()) { | ||
buf = byteBuffer.array(); | ||
} else { | ||
buf = new byte[byteBuffer.remaining()]; | ||
byteBuffer.get(buf); | ||
} | ||
// We have to use a byte[], since update(<ByteBuffer>) is java 9+ | ||
checksums.forEach(checksum -> checksum.update(buf, 0, buf.length)); | ||
|
||
subscriber.onNext(byteBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we copy the default implementation of update(ByteBuffer)
from Java 9+? It honors the bounds of the byte buffer properly. This implementation doesn't seem to reset the buffer when byteBuffer.hasArray()
is false, either. Here's a version where I use update(ByteBuffer)
from Java 9+ and reset the buffer after using it:
@SdkInternalApi
public final class ChecksumSubscriber extends DelegatingSubscriber<ByteBuffer, ByteBuffer> {
private final CompletableFuture<Void> signal;
private final byte[] copyBuffer;
private final Collection<Checksum> checksums = new ArrayList<>();
public ChecksumSubscriber(Subscriber<? super ByteBuffer> subscriber,
Collection<? extends Checksum> consumers,
CompletableFuture<Void> signal) {
super(subscriber);
this.checksums.addAll(consumers);
this.signal = signal;
}
@Override
public void onNext(ByteBuffer byteBuffer) {
updateChecksumsAndReset(byteBuffer);
subscriber.onNext(byteBuffer);
}
private void updateChecksumsAndReset(ByteBuffer buffer) {
int position = buffer.position();
int limit = buffer.limit();
int remaining = limit - position;
if (remaining <= 0) {
return;
}
if (buffer.hasArray()) {
checksums.forEach(c -> c.update(buffer.array(), position + buffer.arrayOffset(), remaining));
} else {
if (copyBuffer == null) {
copyBuffer = new byte[4096];
}
while (buffer.hasRemaining()) {
int length = Math.min(buffer.remaining(), copyBuffer.length);
buffer.get(copyBuffer, 0, length);
checksums.forEach(c -> c.update(copyBuffer, 0, length));
}
}
buffer.position(position);
buffer.limit(limit);
}
@Override
public void onError(Throwable t) {
super.onError(t);
signal.completeExceptionally(t);
}
@Override
public void onComplete() {
super.onComplete();
signal.complete(null);
}
}
Closed in favor of #4567 |
Motivation and Context
Modifications
Testing
Screenshots (if appropriate)
Types of changes
Checklist
mvn install
succeedsscripts/new-change
script and following the instructions. Commit the new file created by the script in.changes/next-release
with your changes.License