diff --git a/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java b/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java index 91798cf668e7b1..b42d0515464f62 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java +++ b/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java @@ -25,7 +25,6 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.PathStrippable; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import com.google.protobuf.ByteString; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -239,13 +238,6 @@ public byte[] atomicallyWriteTo(Path outputPath) throws IOException { return super.atomicallyWriteTo(outputPath); } - @Override - public ByteString getBytes() throws IOException { - ByteString.Output out = ByteString.newOutput(); - writeTo(out); - return out.toByteString(); - } - @Override public String getExecPathString() { return paramFileExecPath.getPathString(); diff --git a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java index bd5c123fa8c5cc..e292666e0abb40 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java +++ b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java @@ -117,8 +117,15 @@ protected byte[] writeTo(Path target) throws IOException { /** * Gets a {@link ByteString} representation of the fake file. Used to avoid copying if the fake * file is internally represented as a {@link ByteString}. + * + *

Prefer {@link #writeTo} to this method to avoid materializing the entire file in memory. The + * return value should not be retained. */ - public abstract ByteString getBytes() throws IOException; + public ByteString getBytes() throws IOException { + ByteString.Output out = ByteString.newOutput(); + writeTo(out); + return out.toByteString(); + } /** * Returns the metadata for this input if available. Null otherwise. @@ -174,7 +181,7 @@ public void writeTo(OutputStream out) throws IOException { } @Override - public ByteString getBytes() throws IOException { + public ByteString getBytes() { return ByteString.EMPTY; } diff --git a/src/main/java/com/google/devtools/build/lib/exec/BUILD b/src/main/java/com/google/devtools/build/lib/exec/BUILD index 44c2e51d933819..156a5ae39d161b 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/BUILD +++ b/src/main/java/com/google/devtools/build/lib/exec/BUILD @@ -59,7 +59,6 @@ java_library( "//third_party:error_prone_annotations", "//third_party:guava", "//third_party:jsr305", - "@com_google_protobuf//:protobuf_java", ], ) @@ -287,6 +286,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/remote/options", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/java/com/google/devtools/build/lib/util/io:io-proto", "//src/main/java/com/google/devtools/build/lib/vfs", @@ -298,6 +298,7 @@ java_library( "//third_party:jsr305", "@com_google_protobuf//:protobuf_java", "@com_google_protobuf//:protobuf_java_util", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", "@zstd-jni", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/exec/BinTools.java b/src/main/java/com/google/devtools/build/lib/exec/BinTools.java index 655ef4d02fe29c..3ab79f4f71b953 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/BinTools.java +++ b/src/main/java/com/google/devtools/build/lib/exec/BinTools.java @@ -30,7 +30,6 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.Symlinks; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import com.google.protobuf.ByteString; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -201,13 +200,6 @@ protected byte[] atomicallyWriteTo(Path outputPath) throws IOException { return digest; } - @Override - public ByteString getBytes() throws IOException { - ByteString.Output out = ByteString.newOutput(); - writeTo(out); - return out.toByteString(); - } - @Override public FileArtifactValue getMetadata() throws IOException { // We intentionally delay hashing until it is necessary. diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java index 383d10948d8345..094fcca7f0f6f5 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java +++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java @@ -35,6 +35,7 @@ import com.google.devtools.build.lib.exec.Protos.EnvironmentVariable; import com.google.devtools.build.lib.exec.Protos.Platform; import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.lib.vfs.DigestUtils; import com.google.devtools.build.lib.vfs.FileStatus; @@ -167,11 +168,9 @@ protected Digest computeDigest( if (input != null) { if (input instanceof VirtualActionInput virtualActionInput) { - byte[] blob = virtualActionInput.getBytes().toByteArray(); - return builder - .setHash(digestHashFunction.getHashFunction().hashBytes(blob).toString()) - .setSizeBytes(blob.length) - .build(); + build.bazel.remote.execution.v2.Digest digest = + DigestUtil.compute(virtualActionInput, digestHashFunction.getHashFunction()); + return builder.setHash(digest.getHash()).setSizeBytes(digest.getSizeBytes()).build(); } // Try to obtain a digest from the input metadata. diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index e71133ad0a28f2..e0fc26f51cc5e3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -108,6 +108,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/remote/zstd", "//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value", "//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions", diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index aea5383cb2a6e3..d2b9a92aeea49b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -16,8 +16,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction; -import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; -import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; @@ -50,9 +48,6 @@ import io.grpc.stub.ClientResponseObserver; import io.netty.util.ReferenceCounted; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; @@ -104,54 +99,6 @@ final class ByteStreamUploader { this.digestFunction = digestFunction; } - @VisibleForTesting - ReferenceCountedChannel getChannel() { - return channel; - } - - @VisibleForTesting - RemoteRetrier getRetrier() { - return retrier; - } - - /** - * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. - * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. - * - *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is - * transparent to the user of this API. - * - * @param digest the digest of the data to upload. - * @param chunker the data to upload. - * @throws IOException when reading of the {@link Chunker}s input source fails - */ - public void uploadBlob(RemoteActionExecutionContext context, Digest digest, Chunker chunker) - throws IOException, InterruptedException { - getFromFuture(uploadBlobAsync(context, digest, chunker)); - } - - /** - * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks - * until the upload of all BLOBs is complete, or throws an {@link - * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors. - * - *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is - * transparent to the user of this API. - * - * @param chunkers the data to upload. - * @throws IOException when reading of the {@link Chunker}s input source or uploading fails - */ - public void uploadBlobs(RemoteActionExecutionContext context, Map chunkers) - throws IOException, InterruptedException { - List> uploads = new ArrayList<>(); - - for (Map.Entry chunkerEntry : chunkers.entrySet()) { - uploads.add(uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue())); - } - - waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true); - } - /** * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns * immediately and one can listen to the returned future for the success/failure of the upload. @@ -163,7 +110,7 @@ public void uploadBlobs(RemoteActionExecutionContext context, Map uploadBlobAsync( RemoteActionExecutionContext context, Digest digest, Chunker chunker) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index 12c43bb37969d1..e1da153fe243e4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -22,11 +22,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputHelper; -import com.google.devtools.build.lib.actions.cache.VirtualActionInput; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; -import com.google.devtools.build.lib.vfs.Path; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; @@ -46,7 +43,7 @@ * *

This class should not be extended - it's only non-final for testing. */ -public class Chunker { +public class Chunker implements AutoCloseable { private static int defaultChunkSize = 1024 * 16; @@ -100,12 +97,7 @@ public int hashCode() { } } - /** A supplier that provide data as {@link InputStream}. */ - public interface ChunkDataSupplier { - InputStream get() throws IOException; - } - - private final ChunkDataSupplier dataSupplier; + private final Blob blob; private final long uncompressedSize; private final int chunkSize; private final Chunk emptyChunk; @@ -120,9 +112,8 @@ public interface ChunkDataSupplier { // lazily on the first call to next(), as opposed to opening it in the constructor or on reset(). private boolean initialized; - Chunker( - ChunkDataSupplier dataSupplier, long uncompressedSize, int chunkSize, boolean compressed) { - this.dataSupplier = checkNotNull(dataSupplier); + Chunker(Blob blob, long uncompressedSize, int chunkSize, boolean compressed) { + this.blob = checkNotNull(blob); this.uncompressedSize = uncompressedSize; this.chunkSize = chunkSize; this.emptyChunk = new Chunk(ByteString.EMPTY, 0); @@ -143,7 +134,7 @@ public long getUncompressedSize() { *

Closes any open resources (file handles, ...). */ public void reset() throws IOException { - close(); + closeInput(); offset = 0; initialized = false; } @@ -168,7 +159,7 @@ public void seek(long toOffset) throws IOException { initialize(toOffset); } if (uncompressedSize > 0 && data.finished()) { - close(); + closeInput(); } } @@ -180,7 +171,7 @@ public boolean hasNext() { } /** Closes the input stream and reset chunk cache */ - private void close() throws IOException { + private void closeInput() throws IOException { if (data != null) { data.close(); data = null; @@ -188,6 +179,12 @@ private void close() throws IOException { chunkCache = null; } + @Override + public void close() throws IOException { + reset(); + blob.close(); + } + /** Attempts reading at most a full chunk and stores it in the chunkCache buffer */ private int read() throws IOException { int count = 0; @@ -217,7 +214,7 @@ public Chunk next() throws IOException { maybeInitialize(); if (uncompressedSize == 0) { - close(); + closeInput(); return emptyChunk; } @@ -249,7 +246,7 @@ public Chunk next() throws IOException { // or the guard in getActualSize won't work. offset += bytesRead; if (data.finished()) { - close(); + closeInput(); } return new Chunk(blob, offsetBefore); @@ -268,7 +265,7 @@ private void initialize(long srcPos) throws IOException { checkState(offset == 0); checkState(chunkCache == null); try { - var src = dataSupplier.get(); + var src = blob.get(); ByteStreams.skipFully(src, srcPos); data = compressed @@ -294,49 +291,23 @@ public static class Builder { private int chunkSize = getDefaultChunkSize(); protected long size; private boolean compressed; - protected ChunkDataSupplier inputStream; - - @CanIgnoreReturnValue - public Builder setInput(byte[] data) { - checkState(inputStream == null); - size = data.length; - setInputSupplier(() -> new ByteArrayInputStream(data)); - return this; - } + protected Blob inputStream; @CanIgnoreReturnValue - public Builder setInput(long size, InputStream in) { + public Builder setInput(long size, Blob in) { checkState(inputStream == null); checkNotNull(in); this.size = size; - inputStream = () -> in; - return this; - } - - @CanIgnoreReturnValue - public Builder setInput(long size, Path file) { - checkState(inputStream == null); - this.size = size; - inputStream = file::getInputStream; - return this; - } - - @CanIgnoreReturnValue - public Builder setInput(long size, ActionInput actionInput, Path execRoot) { - checkState(inputStream == null); - this.size = size; - if (actionInput instanceof VirtualActionInput virtualActionInput) { - inputStream = () -> virtualActionInput.getBytes().newInput(); - } else { - inputStream = () -> ActionInputHelper.toInputPath(actionInput, execRoot).getInputStream(); - } + inputStream = in; return this; } @CanIgnoreReturnValue @VisibleForTesting - protected final Builder setInputSupplier(ChunkDataSupplier inputStream) { - this.inputStream = inputStream; + public Builder setInput(byte[] data) { + checkState(inputStream == null); + size = data.length; + this.inputStream = () -> new ByteArrayInputStream(data); return this; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index b8f8c111ed453e..51eea5526b1824 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -59,7 +59,6 @@ import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.remote.util.Utils; import com.google.devtools.build.lib.remote.zstd.ZstdDecompressingOutputStream; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.grpc.Channel; import io.grpc.Status; @@ -483,26 +482,14 @@ private void releaseOut() { return future; } - @Override - public ListenableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path path) { - return uploadChunker( - context, - digest, - Chunker.builder() - .setInput(digest.getSizeBytes(), path) - .setCompressed(shouldCompress(digest)) - .build()); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { + RemoteActionExecutionContext context, Digest digest, Blob blob) { return uploadChunker( context, digest, Chunker.builder() - .setInput(data.toByteArray()) + .setInput(digest.getSizeBytes(), blob) .setCompressed(shouldCompress(digest)) .build()); } @@ -513,10 +500,10 @@ ListenableFuture uploadChunker( f.addListener( () -> { try { - chunker.reset(); + chunker.close(); } catch (IOException e) { logger.atWarning().withCause(e).log( - "failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); + "failed to close chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); } }, MoreExecutors.directExecutor()); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index ccd8eba26b3d08..fc90073279294b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -27,12 +27,14 @@ import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; @@ -40,13 +42,15 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.disk.DiskCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; -import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; import com.google.devtools.build.lib.vfs.Path; +import com.google.protobuf.ByteString; import com.google.protobuf.Message; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Completable; @@ -59,6 +63,7 @@ import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.subjects.AsyncSubject; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -172,6 +177,35 @@ public void ensureInputsPresent( } } + private static final class VirtualActionInputBlob implements Blob { + private VirtualActionInput virtualActionInput; + // Can be large compared to the retained size of the VirtualActionInput and thus shouldn't be + // kept in memory for an extended period of time. + private volatile ByteString data; + + VirtualActionInputBlob(VirtualActionInput virtualActionInput) { + this.virtualActionInput = Preconditions.checkNotNull(virtualActionInput); + } + + @Override + public InputStream get() throws IOException { + if (data == null) { + synchronized (this) { + if (data == null) { + data = Preconditions.checkNotNull(virtualActionInput, "used after close()").getBytes(); + } + } + } + return data.newInput(); + } + + @Override + public void close() { + virtualActionInput = null; + data = null; + } + } + private ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, @@ -183,25 +217,28 @@ private ListenableFuture uploadBlob( return remoteCacheClient.uploadBlob(context, digest, node.toByteString()); } - PathOrBytes file = merkleTree.getFileByDigest(digest); + ContentSource file = merkleTree.getFileByDigest(digest); if (file != null) { - if (file.getBytes() != null) { - return remoteCacheClient.uploadBlob(context, digest, file.getBytes()); - } - - var path = checkNotNull(file.getPath()); - try { - if (remotePathChecker.isRemote(context, path)) { - // If we get here, the remote input was determined to exist in the remote or disk cache at - // some point before action execution, but reported to be missing when querying the remote - // for missing action inputs; possibly because it was evicted in the interim. - reporter.post(new LostInputsEvent(digest)); - throw new CacheNotFoundException(digest, path.getPathString()); + return switch (file) { + case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) -> + remoteCacheClient.uploadBlob( + context, digest, new VirtualActionInputBlob(virtualActionInput)); + case ContentSource.PathSource(Path path) -> { + try { + if (remotePathChecker.isRemote(context, path)) { + // If we get here, the remote input was determined to exist in the remote or disk + // cache at some point before action execution, but reported to be missing when + // querying the remote for missing action inputs; possibly because it was evicted in + // the interim. + reporter.post(new LostInputsEvent(digest)); + throw new CacheNotFoundException(digest, path.getPathString()); + } + } catch (IOException e) { + yield immediateFailedFuture(e); + } + yield remoteCacheClient.uploadFile(context, digest, path); } - } catch (IOException e) { - return immediateFailedFuture(e); - } - return remoteCacheClient.uploadFile(context, digest, path); + }; } Message message = additionalInputs.get(digest); diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 4ed37e5a6e58d5..d96c601803ffac 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -22,7 +22,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; +import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Set; @@ -112,25 +114,56 @@ ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out); /** - * Uploads a {@code file} to the CAS. + * A supplier for the data comprising a BLOB. + * + *

As blobs can be large and may need to be kept in memory, consumers should call {@link #get} + * as late as possible and close the blob as soon as they are done with it. + */ + @FunctionalInterface + interface Blob extends Closeable { + /** Get an input stream for the blob's data. Can be called multiple times. */ + InputStream get() throws IOException; + + @Override + default void close() {} + } + + /** + * Uploads a {@code file} BLOB to the CAS. * * @param context the context for the action. * @param digest The digest of the file. * @param file The file to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadFile(RemoteActionExecutionContext context, Digest digest, Path file); + default ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { + return uploadBlob(context, digest, () -> new LazyFileInputStream(file)); + } + + /** + * Uploads a blob to the CAS. + * + * @param context the context for the action. + * @param digest The digest of the blob. + * @param blob A supplier for the blob to upload. May be called multiple times, but is closed by + * the implementation after the upload is complete. + * @return A future representing pending completion of the upload. + */ + ListenableFuture uploadBlob(RemoteActionExecutionContext context, Digest digest, Blob blob); /** - * Uploads a BLOB to the CAS. + * Uploads an in-memory BLOB to the CAS. * * @param context the context for the action. * @param digest The digest of the blob. * @param data The BLOB to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data); + default ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { + return uploadBlob(context, digest, data::newInput); + } /** Close resources associated with the remote cache. */ void close(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD b/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD index 6b5d3d8d05e4fc..962a5388d0ddf2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD @@ -21,6 +21,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/server:idle_task", "//src/main/java/com/google/devtools/build/lib/util:file_system_lock", "//src/main/java/com/google/devtools/build/lib/vfs", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD index 7ea7e0ef886e96..b25b40b7d4bd12 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD @@ -21,12 +21,11 @@ java_library( deps = [ "//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info", "//src/main/java/com/google/devtools/build/lib/authandtls", - "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/util", - "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//third_party:auth", "//third_party:flogger", "//third_party:guava", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index e6a661ddbe357a..d2cc5382ac71e0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -31,13 +31,11 @@ import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.LazyFileInputStream; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.util.DigestOutputStream; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.Utils; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -641,7 +639,7 @@ private ListenableFuture uploadAsync( public void close() { // Ensure that the InputStream can't be closed somewhere in the Netty // pipeline, so that we can support retries. The InputStream is closed in - // the finally block below. + // the listener block below. } }; UploadCommand upload = new UploadCommand(uri, casUpload, key, wrappedIn, length); @@ -718,25 +716,17 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path file) { - return retrier.executeAsync( - () -> - uploadAsync( - digest.getHash(), - digest.getSizeBytes(), - new LazyFileInputStream(file), - /* casUpload= */ true)); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { + RemoteActionExecutionContext context, Digest digest, Blob blob) { return retrier.executeAsync( - () -> - uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); + () -> { + var result = + uploadAsync( + digest.getHash(), digest.getSizeBytes(), blob.get(), /* casUpload= */ true); + result.addListener(blob::close, MoreExecutors.directExecutor()); + return result; + }); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD index 63610c304eb536..00f5b343ae3cf4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD @@ -21,13 +21,13 @@ java_library( "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/remote:scrubber", - "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/util:string_encoding", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", + "//third_party:error_prone_annotations", "//third_party:guava", "//third_party:jsr305", - "@com_google_protobuf//:protobuf_java", "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java index cd3862e35d842a..c24763b7d0f8a8 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java @@ -16,10 +16,10 @@ import build.bazel.remote.execution.v2.Digest; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import com.google.protobuf.ByteString; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -73,7 +73,7 @@ public boolean equals(Object o) { static class FileNode extends Node { private final Path path; - private final ByteString data; + private final VirtualActionInput virtualActionInput; private final Digest digest; private final boolean isExecutable; private final boolean toolInput; @@ -96,15 +96,19 @@ static FileNode createExecutable( } static FileNode createExecutable( - String pathSegment, ByteString data, Digest digest, boolean toolInput) { - return new FileNode(pathSegment, data, digest, /* isExecutable= */ true, toolInput); + String pathSegment, + VirtualActionInput virtualActionInput, + Digest digest, + boolean toolInput) { + return new FileNode( + pathSegment, virtualActionInput, digest, /* isExecutable= */ true, toolInput); } private FileNode( String pathSegment, Path path, Digest digest, boolean isExecutable, boolean toolInput) { super(pathSegment); this.path = Preconditions.checkNotNull(path, "path"); - this.data = null; + this.virtualActionInput = null; this.digest = Preconditions.checkNotNull(digest, "digest"); this.isExecutable = isExecutable; this.toolInput = toolInput; @@ -112,13 +116,13 @@ private FileNode( private FileNode( String pathSegment, - ByteString data, + VirtualActionInput input, Digest digest, boolean isExecutable, boolean toolInput) { super(pathSegment); this.path = null; - this.data = Preconditions.checkNotNull(data, "data"); + this.virtualActionInput = Preconditions.checkNotNull(input, "data"); this.digest = Preconditions.checkNotNull(digest, "digest"); this.isExecutable = isExecutable; this.toolInput = toolInput; @@ -132,8 +136,8 @@ Path getPath() { return path; } - ByteString getBytes() { - return data; + VirtualActionInput getVirtualActionInput() { + return virtualActionInput; } public boolean isExecutable() { @@ -146,7 +150,8 @@ boolean isToolInput() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), path, data, digest, toolInput, isExecutable); + return Objects.hash( + super.hashCode(), path, virtualActionInput, digest, toolInput, isExecutable); } @Override @@ -154,7 +159,7 @@ public boolean equals(Object o) { if (o instanceof FileNode other) { return super.equals(other) && Objects.equals(path, other.path) - && Objects.equals(data, other.data) + && Objects.equals(virtualActionInput, other.virtualActionInput) && Objects.equals(digest, other.digest) && toolInput == other.toolInput && isExecutable == other.isExecutable; diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java index 7b63c14e9a09c7..60fab395147e3f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java @@ -157,10 +157,7 @@ private static int buildFromActionInputs( boolean childAdded = currDir.addChild( FileNode.createExecutable( - path.getBaseName(), - virtualActionInput.getBytes(), - d, - toolInputs.contains(path))); + path.getBaseName(), virtualActionInput, d, toolInputs.contains(path))); return childAdded ? 1 : 0; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java index b3e965986c3ad9..db6f3b90c276e4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java @@ -32,13 +32,15 @@ import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ArtifactPathResolver; import com.google.devtools.build.lib.actions.InputMetadataProvider; +import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.Scrubber.SpawnScrubber; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource.PathSource; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource.VirtualActionInputSource; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -55,31 +57,14 @@ public class MerkleTree { private static final String BAZEL_TOOL_INPUT_MARKER = "bazel_tool_input"; - /** A path or contents */ - public static class PathOrBytes { + /** A source of a file's content. */ + public sealed interface ContentSource { + /** Content provided by an actual file. */ + record PathSource(Path path) implements ContentSource {} - private final Path path; - private final ByteString bytes; - - public PathOrBytes(Path path) { - this.path = Preconditions.checkNotNull(path, "path"); - this.bytes = null; - } - - public PathOrBytes(ByteString bytes) { - this.bytes = Preconditions.checkNotNull(bytes, "bytes"); - this.path = null; - } - - @Nullable - public Path getPath() { - return path; - } - - @Nullable - public ByteString getBytes() { - return bytes; - } + /** Content provided by a virtual action input. */ + record VirtualActionInputSource(VirtualActionInput virtualActionInput) + implements ContentSource {} } private interface MerkleTreeDirectoryVisitor { @@ -95,7 +80,8 @@ private interface MerkleTreeDirectoryVisitor { } private Map digestDirectoryMap; - private Map digestFileMap; + // Object is an unwrapped ContentSource to reduce retained memory when caching MerkleTrees. + private Map digestFileMap; @Nullable private final Directory rootProto; private final Digest rootDigest; private final SortedSet files; @@ -177,13 +163,17 @@ private Map getDigestDirectoryMap() { return this.digestDirectoryMap; } - private Map getDigestFileMap() { + private Map getDigestFileMap() { if (this.digestFileMap == null) { - Map newDigestMap = Maps.newHashMap(); + Map newDigestMap = Maps.newHashMap(); visitTree( (dir) -> { for (DirectoryTree.FileNode file : dir.getFiles()) { - newDigestMap.put(file.getDigest(), toPathOrBytes(file)); + if (file.getPath() != null) { + newDigestMap.put(file.getDigest(), file.getPath()); + } else { + newDigestMap.put(file.getDigest(), file.getVirtualActionInput()); + } } }); this.digestFileMap = newDigestMap; @@ -197,8 +187,15 @@ public Directory getDirectoryByDigest(Digest digest) { } @Nullable - public PathOrBytes getFileByDigest(Digest digest) { - return getDigestFileMap().get(digest); + public ContentSource getFileByDigest(Digest digest) { + return switch (getDigestFileMap().get(digest)) { + case Path path -> new PathSource(path); + case VirtualActionInput virtualActionInput -> + new VirtualActionInputSource(virtualActionInput); + case null -> null; + default -> + throw new IllegalStateException("Unexpected value: " + getDigestFileMap().get(digest)); + }; } /** @@ -420,10 +417,4 @@ private static SymlinkNode buildProto(DirectoryTree.SymlinkNode symlink) { .setTarget(internalToUnicode(symlink.getTarget())) .build(); } - - private static PathOrBytes toPathOrBytes(DirectoryTree.FileNode file) { - return file.getPath() != null - ? new PathOrBytes(file.getPath()) - : new PathOrBytes(file.getBytes()); - } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD index 2d5c1f81dd1eef..835c02662ba71b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD @@ -13,8 +13,15 @@ filegroup( java_library( name = "util", - srcs = glob(["*.java"]), + srcs = glob( + ["*.java"], + exclude = [ + "DigestOutputStream.java", + "DigestUtil.java", + ], + ), deps = [ + ":digest_utils", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", @@ -27,10 +34,8 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/options", - "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/main/protobuf:failure_details_java_proto", - "//src/main/protobuf:spawn_java_proto", "//third_party:guava", "//third_party:jsr305", "//third_party:rxjava3", @@ -42,3 +47,20 @@ java_library( "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", ], ) + +java_library( + name = "digest_utils", + srcs = [ + "DigestOutputStream.java", + "DigestUtil.java", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib/actions", + "//src/main/java/com/google/devtools/build/lib/remote/common", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/protobuf:spawn_java_proto", + "//third_party:guava", + "@com_google_protobuf//:protobuf_java", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java index a3332f8e5e73b2..0a6763e6a815c1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java @@ -21,6 +21,7 @@ import build.bazel.remote.execution.v2.DigestFunction; import com.google.common.collect.ImmutableSet; import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; import com.google.common.io.BaseEncoding; import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; @@ -30,7 +31,6 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.XattrProvider; import com.google.protobuf.Message; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; @@ -91,10 +91,19 @@ public Digest compute(Path path, FileStatus status) throws IOException { DigestUtils.getDigestWithManualFallback(path, xattrProvider, status), status.getSize()); } + public static Digest compute(VirtualActionInput input, HashFunction hashFunction) + throws IOException { + // Stream the virtual action input as parameter files, which can be very large, are lazily + // computed from the in-memory CommandLine object. This avoids allocating large byte arrays. + try (DigestOutputStream digestOutputStream = + new DigestOutputStream(hashFunction, OutputStream.nullOutputStream())) { + input.writeTo(digestOutputStream); + return digestOutputStream.digest(); + } + } + public Digest compute(VirtualActionInput input) throws IOException { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - input.writeTo(buffer); - return compute(buffer.toByteArray()); + return compute(input, hashFn.getHashFunction()); } /** diff --git a/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java b/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java index a3c5763a830b69..2a501d323ae36b 100644 --- a/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java +++ b/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java @@ -21,8 +21,10 @@ */ public interface StreamWriter { /** - * Writes the fake file to an OutputStream. MUST be deterministic, in that multiple calls to - * write the same StreamWriter must write identical bytes. + * Writes the fake file to an OutputStream. MUST be deterministic, in that multiple calls to write + * the same StreamWriter must write identical bytes. + * + * @throws IOException only if out throws an IOException */ void writeTo(OutputStream out) throws IOException; } diff --git a/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java b/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java index 2abbaa101be52e..738edb36fc0c7d 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java +++ b/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java @@ -96,7 +96,6 @@ import com.google.devtools.build.skyframe.SkyFunction.Environment; import com.google.devtools.build.skyframe.SkyFunctionName; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import com.google.protobuf.ByteString; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayDeque; @@ -318,13 +317,6 @@ public static VirtualActionInput createVirtualActionInput(String relativePath, S /** Creates a {@link VirtualActionInput} with given string as contents and provided path. */ public static VirtualActionInput createVirtualActionInput(PathFragment path, String contents) { return new VirtualActionInput() { - @Override - public ByteString getBytes() throws IOException { - ByteString.Output out = ByteString.newOutput(); - writeTo(out); - return out.toByteString(); - } - @Override public String getExecPathString() { return path.getPathString(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index 545e415d330b64..0afa13bb3aca7a 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -122,6 +122,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/runtime/commands", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/lib/testing/vfs:spied_filesystem", @@ -242,7 +243,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/authandtls/credentialhelper:credential_module", "//src/main/java/com/google/devtools/build/lib/dynamic", "//src/main/java/com/google/devtools/build/lib/remote", - "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/util:os", "//src/main/java/com/google/devtools/build/lib/vfs", @@ -268,7 +269,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/events", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote:store", - "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 9b0d421c1f7442..fc0c2d06c17b4a 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -15,6 +15,8 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.truth.Truth.assertThat; +import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; +import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; @@ -42,6 +44,7 @@ import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -179,7 +182,7 @@ public void singleBlobUploadShouldWork() throws Exception { serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob)); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -242,7 +245,7 @@ public void onCompleted() {} } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -357,7 +360,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test triggers one retry. Mockito.verify(mockBackoff, Mockito.times(1)) @@ -473,7 +476,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) .isTrue(); @@ -540,7 +543,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -597,7 +600,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. assertThat(numWriteCalls.get()).isEqualTo(1); @@ -668,7 +671,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should have triggered a single retry, because it made // no progress. @@ -693,7 +696,8 @@ public void earlyWriteResponseShouldCompleteUpload() throws Exception { // provide only enough data to write a single chunk InputStream in = new ByteArrayInputStream(blob, 0, CHUNK_SIZE); - Chunker chunker = Chunker.builder().setInput(blob.length, in).setChunkSize(CHUNK_SIZE).build(); + Chunker chunker = + Chunker.builder().setInput(blob.length, () -> in).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); serviceRegistry.addService( @@ -706,7 +710,7 @@ public StreamObserver write(StreamObserver streamOb } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -757,7 +761,7 @@ public void onCompleted() { }); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { // expected @@ -796,7 +800,7 @@ public StreamObserver write(StreamObserver streamOb } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -829,7 +833,7 @@ public void multipleBlobsUploadShouldWork() throws Exception { serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); } @Test @@ -857,7 +861,8 @@ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exceptio + " --bep_maximum_open_remote_upload_files flag to a number lower than your system" + " default (run 'ulimit -a' for *nix-based operating systems). Original error message:" + " Too many open files"; - assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker))) + assertThat( + assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker))) .hasMessageThat() .isEqualTo(newMessage); } @@ -899,7 +904,7 @@ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAll serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles); } @@ -935,7 +940,7 @@ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception { serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); assertThat(uploader.getOpenedFilePermits()).isNull(); } @@ -1132,7 +1137,7 @@ public ServerCall.Listener interceptCall( } })); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1163,7 +1168,7 @@ public StreamObserver write(StreamObserver response }); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue(); @@ -1205,7 +1210,7 @@ public StreamObserver write(StreamObserver response Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class); @@ -1253,7 +1258,7 @@ public void onCompleted() { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1297,7 +1302,7 @@ public void onCompleted() { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1332,7 +1337,7 @@ public StreamObserver write(StreamObserver response Digest digest = DIGEST_UTIL.compute(blob); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(numCalls.get()).isEqualTo(1); @@ -1387,7 +1392,7 @@ public StreamObserver write(StreamObserver streamOb } }); - assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)); + assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker)); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1472,7 +1477,7 @@ public void onCompleted() { } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1538,7 +1543,7 @@ public void queryWriteStatus( Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); assertThat(numUploads.get()).isEqualTo(1); } @@ -1628,7 +1633,7 @@ public void onCompleted() { Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -1636,6 +1641,53 @@ public void onCompleted() { assertThat(numUploads.get()).isEqualTo(1); } + /** + * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. + * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. + * + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. + * + * @param digest the digest of the data to upload. + * @param chunker the data to upload. + * @throws IOException when reading of the {@link Chunker}s input source fails + */ + private static void uploadBlob( + ByteStreamUploader byteStreamUploader, + RemoteActionExecutionContext context, + Digest digest, + Chunker chunker) + throws IOException, InterruptedException { + getFromFuture(byteStreamUploader.uploadBlobAsync(context, digest, chunker)); + } + + /** + * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks + * until the upload of all BLOBs is complete, or throws an {@link + * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors. + * + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. + * + * @param chunkers the data to upload. + * @throws IOException when reading of the {@link Chunker}s input source or uploading fails + */ + private static void uploadBlobs( + ByteStreamUploader byteStreamUploader, + RemoteActionExecutionContext context, + Map chunkers) + throws IOException, InterruptedException { + List> uploads = new ArrayList<>(); + + for (Map.Entry chunkerEntry : chunkers.entrySet()) { + uploads.add( + byteStreamUploader.uploadBlobAsync( + context, chunkerEntry.getKey(), chunkerEntry.getValue())); + } + + waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true); + } + private static class NoopStreamObserver implements StreamObserver { @Override public void onNext(WriteRequest writeRequest) {} @@ -1763,7 +1815,7 @@ public void queryWriteStatus( /* Custom Chunker used to track number of open files */ private static class TestChunker extends Chunker { - TestChunker(ChunkDataSupplier dataSupplier, long size, int chunkSize, boolean compressed) { + TestChunker(Blob dataSupplier, long size, int chunkSize, boolean compressed) { super(dataSupplier, size, chunkSize, compressed); } @@ -1782,7 +1834,8 @@ private static class TestChunkerBuilder extends Chunker.Builder { public Chunker.Builder setInput(byte[] existingData) { checkState(this.inputStream == null); this.size = existingData.length; - return setInputSupplier( + return setInput( + existingData.length, () -> new TestByteArrayInputStream(existingData, customFileTracker)); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index 7c693480df1482..42c159cb562df3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -18,7 +18,7 @@ import com.github.luben.zstd.Zstd; import com.google.devtools.build.lib.remote.Chunker.Chunk; -import com.google.devtools.build.lib.remote.Chunker.ChunkDataSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -94,7 +94,7 @@ public void close() throws IOException { super.close(); } }; - Chunker chunker = Chunker.builder().setInput(0, inp).build(); + Chunker chunker = Chunker.builder().setInput(0, () -> inp).build(); assertThat(chunker.hasNext()).isTrue(); @@ -136,7 +136,7 @@ public void resourcesShouldBeReleased() throws IOException { byte[] data = new byte[] {1, 2}; final AtomicReference in = new AtomicReference<>(); - ChunkDataSupplier supplier = + RemoteCacheClient.Blob supplier = () -> { in.set(Mockito.spy(new ByteArrayInputStream(data))); return in.get(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java index 251d04a8ec5176..3d9fa052e1ddce 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java @@ -55,6 +55,7 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -397,7 +398,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -472,7 +473,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -553,7 +554,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { Path file = invocationOnMock.getArgument(2, Path.class); @@ -652,7 +653,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) .uploadFile(any(), any(), any()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 100e069b6ab4d1..1836221831f2bb 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -101,6 +101,7 @@ import com.google.devtools.build.lib.remote.RemoteScrubbing.Config; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.common.RemotePathResolver; import com.google.devtools.build.lib.remote.common.RemotePathResolver.DefaultRemotePathResolver; @@ -2162,7 +2163,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep return future; }) .when(cache.remoteCacheClient) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Blob) any()); ActionInput input = ActionInputHelper.fromPath("inputs/foo"); fakeFileCache.createScratchInput(input, "input-foo"); RemoteExecutionService service = newRemoteExecutionService(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD b/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD index 7f09d8d86a2202..fa5cf2b4008345 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD @@ -23,6 +23,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs/bazel", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD index 2d450d41cf3423..3dee6d7f751131 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD @@ -31,6 +31,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//src/test/java/com/google/devtools/build/lib/remote/util", diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD index 16914a57fa8091..91b6c87d52e66e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD @@ -28,6 +28,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//src/test/java/com/google/devtools/build/lib:test_runner", diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD index 3720b4bb0a5777..c457ccccccfd2a 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD @@ -19,7 +19,7 @@ java_test( test_class = "com.google.devtools.build.lib.AllTests", deps = [ "//src/main/java/com/google/devtools/build/lib/remote/logging", - "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/protobuf:remote_execution_log_java_proto", "//src/test/java/com/google/devtools/build/lib:test_runner", diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java b/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java index eb665aca0b67b8..f6a88f43525500 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java @@ -86,7 +86,7 @@ public void virtualActionInputShouldWork() throws Exception { FileNode expectedFooNode = FileNode.createExecutable("foo.cc", foo.getPath(), digestUtil.computeAsUtf8("foo")); FileNode expectedBarNode = - FileNode.createExecutable("bar.cc", bar.getBytes(), digestUtil.computeAsUtf8("bar"), false); + FileNode.createExecutable("bar.cc", bar, digestUtil.computeAsUtf8("bar"), false); assertThat(fileNodesAtDepth(tree, 0)).isEmpty(); assertThat(fileNodesAtDepth(tree, 1)).containsExactly(expectedFooNode, expectedBarNode); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD b/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD index 5724817f1d0d02..fe0d91a5faf6c4 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD @@ -27,7 +27,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/clock", "//src/main/java/com/google/devtools/build/lib/remote/merkletree", - "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java index 6c91c85bde8517..84de07eed3de4b 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java @@ -30,6 +30,7 @@ import com.google.devtools.build.lib.actions.StaticInputMetadataProvider; import com.google.devtools.build.lib.actions.util.ActionsTestUtil; import com.google.devtools.build.lib.clock.JavaClock; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.lib.vfs.FileSystem; @@ -138,10 +139,14 @@ public void buildMerkleTree() throws IOException { digestUtil.computeAsUtf8("buzz"), digestUtil.computeAsUtf8("fizzbuzz") }; - assertThat(tree.getFileByDigest(inputDigests[0]).getPath()).isEqualTo(foo.getPath()); - assertThat(tree.getFileByDigest(inputDigests[1]).getPath()).isEqualTo(bar.getPath()); - assertThat(tree.getFileByDigest(inputDigests[2]).getPath()).isEqualTo(buzz.getPath()); - assertThat(tree.getFileByDigest(inputDigests[3]).getPath()).isEqualTo(fizzbuzz.getPath()); + assertThat(tree.getFileByDigest(inputDigests[0])) + .isEqualTo(new ContentSource.PathSource(foo.getPath())); + assertThat(tree.getFileByDigest(inputDigests[1])) + .isEqualTo(new ContentSource.PathSource(bar.getPath())); + assertThat(tree.getFileByDigest(inputDigests[2])) + .isEqualTo(new ContentSource.PathSource(buzz.getPath())); + assertThat(tree.getFileByDigest(inputDigests[3])) + .isEqualTo(new ContentSource.PathSource(fizzbuzz.getPath())); Digest[] allDigests = Iterables.toArray(tree.getAllDigests(), Digest.class); assertThat(allDigests.length).isEqualTo(dirDigests.length + inputDigests.length); diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index f9de62f38d6bb8..466fa953e64f5f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -20,7 +20,6 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy; import com.google.common.collect.ImmutableSet; -import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -28,10 +27,7 @@ import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.devtools.build.lib.vfs.Path; -import com.google.protobuf.ByteString; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.AbstractMap.SimpleEntry; import java.util.Map; @@ -143,22 +139,11 @@ public ListenableFuture uploadActionResult( return Futures.immediateFuture(null); } - @Override - public ListenableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path file) { - try (InputStream in = file.getInputStream()) { - cas.put(digest, ByteStreams.toByteArray(in)); - } catch (IOException e) { - return Futures.immediateFailedFuture(e); - } - return Futures.immediateFuture(null); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { - try (InputStream in = data.newInput()) { - cas.put(digest, data.toByteArray()); + RemoteActionExecutionContext context, Digest digest, Blob blob) { + try (blob) { + cas.put(digest, blob.get().readAllBytes()); } catch (IOException e) { return Futures.immediateFailedFuture(e); } diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD index e40a00034cd743..606ff0b7b89c38 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD @@ -40,6 +40,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/sandbox:linux_sandbox_command_line_builder", "//src/main/java/com/google/devtools/build/lib/shell", "//src/main/java/com/google/devtools/build/lib/util:os", diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 0fecbb33ec7091..dc66ee31ec9d25 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -34,6 +34,7 @@ import io.grpc.Status; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; import java.util.UUID; @@ -83,11 +84,12 @@ public void read(ReadRequest request, StreamObserver responseObser try { // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. - Chunker c = - Chunker.builder().setInput(getFromFuture(cache.downloadBlob(context, digest))).build(); - while (c.hasNext()) { - responseObserver.onNext( - ReadResponse.newBuilder().setData(c.next().getData()).build()); + byte[] bytes = getFromFuture(cache.downloadBlob(context, digest)); + try (Chunker c = + Chunker.builder().setInput(bytes.length, () -> new ByteArrayInputStream(bytes)).build()) { + while (c.hasNext()) { + responseObserver.onNext(ReadResponse.newBuilder().setData(c.next().getData()).build()); + } } responseObserver.onCompleted(); } catch (CacheNotFoundException e) {