From 5555522688eb6c82caeb3352fb87e0321887b456 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Mon, 13 Jan 2025 11:29:24 +0100 Subject: [PATCH] Make Chunker closeable --- .../build/lib/remote/ByteStreamUploader.java | 2 +- .../devtools/build/lib/remote/Chunker.java | 37 +++++++++---------- .../build/lib/remote/GrpcCacheClient.java | 9 ++--- .../lib/remote/RemoteExecutionCache.java | 10 +++-- .../devtools/build/lib/remote/common/BUILD | 1 - .../lib/remote/common/RemoteCacheClient.java | 23 ++++++++---- .../lib/remote/http/HttpCacheClient.java | 6 +-- .../lib/remote/merkletree/MerkleTree.java | 1 + .../lib/remote/ByteStreamUploaderTest.java | 4 +- .../build/lib/remote/ChunkerTest.java | 2 +- .../build/lib/remote/CombinedCacheTest.java | 12 +++--- .../remote/RemoteExecutionServiceTest.java | 4 +- .../lib/remote/util/InMemoryCacheClient.java | 6 +-- .../build/remote/worker/ByteStreamServer.java | 10 ++--- 14 files changed, 65 insertions(+), 62 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index c7602543032709..d2b9a92aeea49b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -110,7 +110,7 @@ final class ByteStreamUploader { * performed. This is transparent to the user of this API. * * @param digest the {@link Digest} of the data to upload. - * @param chunker the data to upload. + * @param chunker the data to upload. Callers are responsible for closing the {@link Chunker}. */ public ListenableFuture uploadBlobAsync( RemoteActionExecutionContext context, Digest digest, Chunker chunker) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index 01479bbffa6425..e1da153fe243e4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -22,12 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; @@ -44,7 +43,7 @@ * *

This class should not be extended - it's only non-final for testing. */ -public class Chunker { +public class Chunker implements AutoCloseable { private static int defaultChunkSize = 1024 * 16; @@ -98,7 +97,7 @@ public int hashCode() { } } - private final CloseableBlobSupplier dataSupplier; + private final Blob blob; private final long uncompressedSize; private final int chunkSize; private final Chunk emptyChunk; @@ -113,12 +112,8 @@ public int hashCode() { // lazily on the first call to next(), as opposed to opening it in the constructor or on reset(). private boolean initialized; - Chunker( - CloseableBlobSupplier dataSupplier, - long uncompressedSize, - int chunkSize, - boolean compressed) { - this.dataSupplier = checkNotNull(dataSupplier); + Chunker(Blob blob, long uncompressedSize, int chunkSize, boolean compressed) { + this.blob = checkNotNull(blob); this.uncompressedSize = uncompressedSize; this.chunkSize = chunkSize; this.emptyChunk = new Chunk(ByteString.EMPTY, 0); @@ -139,7 +134,7 @@ public long getUncompressedSize() { *

Closes any open resources (file handles, ...). */ public void reset() throws IOException { - close(); + closeInput(); offset = 0; initialized = false; } @@ -164,7 +159,7 @@ public void seek(long toOffset) throws IOException { initialize(toOffset); } if (uncompressedSize > 0 && data.finished()) { - close(); + closeInput(); } } @@ -176,7 +171,7 @@ public boolean hasNext() { } /** Closes the input stream and reset chunk cache */ - private void close() throws IOException { + private void closeInput() throws IOException { if (data != null) { data.close(); data = null; @@ -184,8 +179,10 @@ private void close() throws IOException { chunkCache = null; } - public void closeQuietly() { - dataSupplier.close(); + @Override + public void close() throws IOException { + reset(); + blob.close(); } /** Attempts reading at most a full chunk and stores it in the chunkCache buffer */ @@ -217,7 +214,7 @@ public Chunk next() throws IOException { maybeInitialize(); if (uncompressedSize == 0) { - close(); + closeInput(); return emptyChunk; } @@ -249,7 +246,7 @@ public Chunk next() throws IOException { // or the guard in getActualSize won't work. offset += bytesRead; if (data.finished()) { - close(); + closeInput(); } return new Chunk(blob, offsetBefore); @@ -268,7 +265,7 @@ private void initialize(long srcPos) throws IOException { checkState(offset == 0); checkState(chunkCache == null); try { - var src = dataSupplier.get(); + var src = blob.get(); ByteStreams.skipFully(src, srcPos); data = compressed @@ -294,10 +291,10 @@ public static class Builder { private int chunkSize = getDefaultChunkSize(); protected long size; private boolean compressed; - protected CloseableBlobSupplier inputStream; + protected Blob inputStream; @CanIgnoreReturnValue - public Builder setInput(long size, CloseableBlobSupplier in) { + public Builder setInput(long size, Blob in) { checkState(inputStream == null); checkNotNull(in); this.size = size; diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 308b2f3ec5b5dc..51eea5526b1824 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -484,12 +484,12 @@ private void releaseOut() { @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) { + RemoteActionExecutionContext context, Digest digest, Blob blob) { return uploadChunker( context, digest, Chunker.builder() - .setInput(digest.getSizeBytes(), data) + .setInput(digest.getSizeBytes(), blob) .setCompressed(shouldCompress(digest)) .build()); } @@ -500,11 +500,10 @@ ListenableFuture uploadChunker( f.addListener( () -> { try { - chunker.reset(); - chunker.closeQuietly(); + chunker.close(); } catch (IOException e) { logger.atWarning().withCause(e).log( - "failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); + "failed to close chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); } }, MoreExecutors.directExecutor()); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index d4dbe7a14b7332..e0bf3a2f5c5b3c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -42,7 +42,7 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.disk.DiskCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource; @@ -177,11 +177,13 @@ public void ensureInputsPresent( } } - private static final class VirtualActionInputDataSupplier implements CloseableBlobSupplier { + private static final class VirtualActionInputBlob implements Blob { private VirtualActionInput virtualActionInput; + // Can be large compared to the retained size of the VirtualActionInput and thus shouldn't be + // kept in memory for an extended period of time. private volatile ByteString data; - VirtualActionInputDataSupplier(VirtualActionInput virtualActionInput) { + VirtualActionInputBlob(VirtualActionInput virtualActionInput) { this.virtualActionInput = Preconditions.checkNotNull(virtualActionInput); } @@ -220,7 +222,7 @@ private ListenableFuture uploadBlob( return switch (file) { case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) -> remoteCacheClient.uploadBlob( - context, digest, new VirtualActionInputDataSupplier(virtualActionInput)); + context, digest, new VirtualActionInputBlob(virtualActionInput)); case ContentSource.PathSource(Path path) -> { try { if (remotePathChecker.isRemote(context, path)) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD index c04252a26ca373..f7f60e8e60428f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD @@ -56,7 +56,6 @@ java_library( "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/exec:spawn_input_expander", "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", - "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//third_party:guava", diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 416c01c0f6dcbe..bacb90bf0e0159 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -20,9 +20,9 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -113,9 +113,16 @@ ListenableFuture uploadActionResult( ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out); + /** + * A supplier for the data comprising a BLOB. + * + *

As blobs can be large and may need to be kept in memory, consumers should call {@link #get} + * as late as possible and close the blob as soon as they are done with it. + */ @FunctionalInterface - interface CloseableBlobSupplier extends SilentCloseable { - InputStream get(); + interface Blob extends Closeable { + /** Get an input stream for the blob's data. Can be called multiple times. */ + InputStream get() throws IOException; @Override default void close() {} @@ -126,14 +133,14 @@ default void close() {} * * @param context the context for the action. * @param digest The digest of the blob. - * @param data A supplier for the data to upload. May be called multiple times. + * @param blob A supplier for the blob to upload. May be called multiple times, but is closed by + * the implementation after the upload is complete. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data); + ListenableFuture uploadBlob(RemoteActionExecutionContext context, Digest digest, Blob blob); /** - * Uploads a {@code file} to the CAS. + * Uploads a {@code file} BLOB to the CAS. * * @param context the context for the action. * @param digest The digest of the file. @@ -146,7 +153,7 @@ default ListenableFuture uploadFile( } /** - * Uploads a BLOB to the CAS. + * Uploads an in-memory BLOB to the CAS. * * @param context the context for the action. * @param digest The digest of the blob. diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 644a38fc91ea82..37ca1bfc639024 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -718,12 +718,12 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier in) { + RemoteActionExecutionContext context, Digest digest, Blob blob) { return retrier.executeAsync( () -> { var result = - uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true); - result.addListener(in::close, MoreExecutors.directExecutor()); + uploadAsync(digest.getHash(), digest.getSizeBytes(), blob.get(), /* casUpload= */ true); + result.addListener(blob::close, MoreExecutors.directExecutor()); return result; }); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java index 707c6bb8e5acf6..0457fe04573910 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java @@ -57,6 +57,7 @@ public class MerkleTree { private static final String BAZEL_TOOL_INPUT_MARKER = "bazel_tool_input"; + /** A source of a file's content */ public sealed interface ContentSource { record PathSource(Path path) implements ContentSource {} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 0986a00ce2ec19..fc0c2d06c17b4a 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -44,7 +44,7 @@ import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -1815,7 +1815,7 @@ public void queryWriteStatus( /* Custom Chunker used to track number of open files */ private static class TestChunker extends Chunker { - TestChunker(CloseableBlobSupplier dataSupplier, long size, int chunkSize, boolean compressed) { + TestChunker(Blob dataSupplier, long size, int chunkSize, boolean compressed) { super(dataSupplier, size, chunkSize, compressed); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index 758717dfba6dd4..42c159cb562df3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -136,7 +136,7 @@ public void resourcesShouldBeReleased() throws IOException { byte[] data = new byte[] {1, 2}; final AtomicReference in = new AtomicReference<>(); - RemoteCacheClient.CloseableBlobSupplier supplier = + RemoteCacheClient.Blob supplier = () -> { in.set(Mockito.spy(new ByteArrayInputStream(data))); return in.get(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java index 4157bb8bee9023..3d9fa052e1ddce 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java @@ -55,7 +55,7 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -74,7 +74,6 @@ import com.google.devtools.common.options.Options; import com.google.protobuf.ByteString; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Deque; @@ -90,7 +89,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -400,7 +398,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -475,7 +473,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -556,7 +554,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { Path file = invocationOnMock.getArgument(2, Path.class); @@ -655,7 +653,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) .uploadFile(any(), any(), any()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 6cc79b95098738..1836221831f2bb 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -101,7 +101,7 @@ import com.google.devtools.build.lib.remote.RemoteScrubbing.Config; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.common.RemotePathResolver; import com.google.devtools.build.lib.remote.common.RemotePathResolver.DefaultRemotePathResolver; @@ -2163,7 +2163,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep return future; }) .when(cache.remoteCacheClient) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); ActionInput input = ActionInputHelper.fromPath("inputs/foo"); fakeFileCache.createScratchInput(input, "input-foo"); RemoteExecutionService service = newRemoteExecutionService(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index 01a3870d69cfed..466fa953e64f5f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -141,9 +141,9 @@ public ListenableFuture uploadActionResult( @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) { - try { - cas.put(digest, data.get().readAllBytes()); + RemoteActionExecutionContext context, Digest digest, Blob blob) { + try (blob) { + cas.put(digest, blob.get().readAllBytes()); } catch (IOException e) { return Futures.immediateFailedFuture(e); } diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 15080fa9280d3f..dc66ee31ec9d25 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -85,11 +85,11 @@ public void read(ReadRequest request, StreamObserver responseObser // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. byte[] bytes = getFromFuture(cache.downloadBlob(context, digest)); - Chunker c = - Chunker.builder().setInput(bytes.length, () -> new ByteArrayInputStream(bytes)).build(); - while (c.hasNext()) { - responseObserver.onNext( - ReadResponse.newBuilder().setData(c.next().getData()).build()); + try (Chunker c = + Chunker.builder().setInput(bytes.length, () -> new ByteArrayInputStream(bytes)).build()) { + while (c.hasNext()) { + responseObserver.onNext(ReadResponse.newBuilder().setData(c.next().getData()).build()); + } } responseObserver.onCompleted(); } catch (CacheNotFoundException e) {