diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e9dd94ff4ed88..6172e9e0a65b7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -79,7 +79,11 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_GET_LAYOUT_ON_OPEN; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READ_VECTORED_PARALLEL; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_ENABLE_GET_LAYOUT_ON_OPEN; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_ENABLE_READ_VECTORED_PARALLEL; /** * Configuration for Azure Blob FileSystem. @@ -303,6 +307,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ALWAYS_READ_BUFFER_SIZE) private boolean alwaysReadBufferSize; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_GET_LAYOUT_ON_OPEN, + DefaultValue = DEFAULT_ENABLE_GET_LAYOUT_ON_OPEN) + private boolean enableGetLayoutOnOpen; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH, DefaultValue = DEFAULT_ENABLE_FLUSH) private boolean enableFlush; @@ -490,6 +498,8 @@ public class AbfsConfiguration{ private long sasTokenRenewPeriodForStreamsInSeconds; @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ENABLE_READ_VECTORED_PARALLEL, DefaultValue = DEFAULT_ENABLE_READ_VECTORED_PARALLEL) + private boolean enableReadVectoredParallel; @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR) private boolean enableAbfsListIterator; @@ -1218,6 +1228,10 @@ public long getSasTokenRenewPeriodForStreamsInSeconds() { return this.sasTokenRenewPeriodForStreamsInSeconds; } + public boolean isReadVectoredParallelEnabled() { + return this.enableReadVectoredParallel; + } + public String getAzureBlockLocationHost() { return this.azureBlockLocationHost; } @@ -1298,6 +1312,10 @@ public boolean shouldReadBufferSizeAlways() { return this.alwaysReadBufferSize; } + public boolean isGetLayoutOnOpenEnabled() { + return this.enableGetLayoutOnOpen; + } + public boolean isFlushEnabled() { return this.enableFlush; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index de4bc79d55aa8..8e9fe72e350f7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.lang.reflect.InvocationTargetException; @@ -890,13 +891,14 @@ public AbfsInputStream openFileForRead(Path path, long contentLength; ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance(); /* - * GetPathStatus API has to be called in case of: - * 1. fileStatus is null or not an object of VersionedFileStatus: as eTag - * would not be there in the fileStatus object. - * 2. fileStatus is an object of VersionedFileStatus and the object doesn't - * have encryptionContext field when client's encryptionType is - * ENCRYPTION_CONTEXT. - */ + * GetPathStatus API has to be called in case of: + * 1. fileStatus is null or not an object of VersionedFileStatus: as eTag + * would not be there in the fileStatus object. + * 2. fileStatus is an object of VersionedFileStatus and the object doesn't + * have encryptionContext field when client's encryptionType is + * ENCRYPTION_CONTEXT. + */ + byte[] layout = null; if ((fileStatus instanceof VersionedFileStatus) && ( getClient().getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT || ((VersionedFileStatus) fileStatus).getEncryptionContext() @@ -917,9 +919,24 @@ public AbfsInputStream openFileForRead(Path path, encryptionContext.getBytes(StandardCharsets.UTF_8)); } } else { - AbfsHttpOperation op = getClient().getPathStatus(relativePath, false, - tracingContext, null).getResult(); - resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE; + AbfsHttpOperation op; + if (abfsConfiguration.isGetLayoutOnOpenEnabled()) { + AbfsRestOperation restOp = getClient().getLayoutOperation(relativePath, tracingContext); + op = restOp.getResult(); + InputStream stream = op.getListResultStream(); + if (stream != null) { + int size = stream.available(); + layout = new byte[size]; + int read = stream.read(layout); + if (read != size) { + LOG.warn("Could not read all bytes from list result stream"); + } + } + } else { + op = getClient().getPathStatus(relativePath, false, + tracingContext, null).getResult(); + } + resourceType = /*getClient().checkIsDir(op) ? DIRECTORY :*/ FILE; contentLength = extractContentLength(op); eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); /* @@ -954,7 +971,7 @@ public AbfsInputStream openFileForRead(Path path, return new AbfsInputStream(getClient(), statistics, relativePath, contentLength, populateAbfsInputStreamContext( parameters.map(OpenFileParameters::getOptions), - contextEncryptionAdapter), + contextEncryptionAdapter).withLayout(layout), eTag, tracingContext); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 918997ab43b01..b45133adbe225 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -197,6 +197,7 @@ public final class AbfsHttpConstants { public enum ApiVersion { DEC_12_2019("2019-12-12"), + FEB_10_2020("2020-02-10"), APR_10_2021("2021-04-10"), AUG_03_2023("2023-08-03"), NOV_04_2024("2024-11-04"), diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index c5eb9235fbb54..66662ff92c708 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -266,6 +266,8 @@ public final class ConfigurationKeys { public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize"; public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize"; + /** Enable issuing GetLayout requests when files are opened to retrieve blob layout metadata. */ + public static final String FS_AZURE_ENABLE_GET_LAYOUT_ON_OPEN = "fs.azure.enable.getLayoutOnOpen"; /** Provides a config control to enable or disable ABFS Flush operations - * HFlush and HSync. Default is true. **/ public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; @@ -485,6 +487,8 @@ public static String containerProperty(String property, String fsName, String ac /** Key for SAS token provider: {@value}. **/ public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = "fs.azure.sas.token.provider.type"; + public static final String FS_AZURE_ENABLE_READ_VECTORED_PARALLEL = "fs.azure.enable.read.vectored.parallel"; + /** For performance, AbfsInputStream/AbfsOutputStream re-use SAS tokens until the expiry is within this number of seconds. **/ public static final String FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS = "fs.azure.sas.token.renew.period.for.streams"; @@ -546,7 +550,7 @@ public static String containerProperty(String property, String fsName, String ac public static final String FS_AZURE_BLOB_COPY_MAX_WAIT_MILLIS = "fs.azure.blob.copy.max.wait.millis"; /**Blob rename lease refresh duration: {@value}*/ public static final String FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION - = "fs.azure.blob.atomic.rename.lease.refresh.duration"; + = "fs.azure.blob.atomic.rename.lease.refresh.duration"; /**Maximum number of blob information enqueued in memory for rename or delete orchestration: {@value}*/ public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.blob.dir.list.producer.queue.max.size"; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 6f76f2e033c06..2017cacc03032 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -103,6 +103,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB + public static final boolean DEFAULT_ENABLE_GET_LAYOUT_ON_OPEN = false; public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB @@ -157,6 +158,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ENABLE_CHECK_ACCESS = true; public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; + public static final boolean DEFAULT_ENABLE_READ_VECTORED_PARALLEL = false; public static final boolean DEFAULT_ENABLE_READAHEAD = true; public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false; @@ -240,7 +242,7 @@ public final class FileSystemConfigurations { * Default buffer option: {@value}. */ public static final String DATA_BLOCKS_BUFFER_DEFAULT = - DATA_BLOCKS_BYTEBUFFER; + DATA_BLOCKS_BYTEBUFFER; /** * IO rate limit. Value: {@value} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index 9521518fa1f17..78d5ca43c77d9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -139,5 +139,7 @@ public final class HttpHeaderConfigurations { */ public static final String X_MS_CLIENT_TRANSACTION_ID = "x-ms-client-transaction-id"; + public static final String X_MS_BLOB_LAYOUT = "x-ms-blob-layout"; + private HttpHeaderConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index db3e163ca4d5f..1ab903854558c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -82,6 +82,7 @@ import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; @@ -111,6 +112,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH_ENCODE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; @@ -981,6 +983,96 @@ public abstract AbfsRestOperation read(String path, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) throws AzureBlobFileSystemException; + /** + * Read the contents of the file at specified path with layout. + * @param path of the file to be read. + * @param position in the file from where data has to be read. + * @param buffer to store the data read. + * @param bufferOffset offset in the buffer to start storing the data. + * @param bufferLength length of data to be read. + * @param eTag to specify conditional headers. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @param dataKeys byte array containing the data keys. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public AbfsRestOperation readWithLayout(String path, + long position, + byte[] buffer, + int bufferOffset, + int bufferLength, + String eTag, + String cachedSasToken, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext, + byte[] dataKeys, + String endpoint) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(ApiVersion.NOV_04_2024); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + AbfsHttpHeader rangeHeader = new AbfsHttpHeader("x-ms-range", + String.format("bytes=%d-%d", position, position + bufferLength - 1)); + requestHeaders.add(rangeHeader); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + // Add request priority header for prefetch reads + addRequestPriorityForPrefetch(requestHeaders, tracingContext); + + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, + SASTokenProvider.READ_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + URL url; + String rootUrl = ""; + try { + if (endpoint != null && !endpoint.isEmpty()) { + rootUrl = endpoint; + } else { + URL blobUrl = UriUtils.changeUrlFromDfsToBlob(baseUrl); + String blobUrlStr = blobUrl.toString(); + // Remove filesystem path. baseUrl usually is https://account.dfs.core.windows.net/filesystem + // We want https://account.blob.core.windows.net/$decoder + + // Find the start of the path + int pathStart = blobUrlStr.indexOf("/", blobUrlStr.indexOf("//") + 2); + if (pathStart != -1) { + rootUrl = blobUrlStr.substring(0, pathStart); + } else { + rootUrl = blobUrlStr; + } + } + + url = new URL(rootUrl + "/$decoder" + abfsUriQueryBuilder.toString()); + } catch (MalformedURLException e) { + throw new InvalidUriException(rootUrl); + } + + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.ReadFile, + AbfsHttpConstants.HTTP_METHOD_POST, + url, + requestHeaders, + buffer, + bufferOffset, + bufferLength, + dataKeys, + 0, + dataKeys.length, + sasTokenForReuse); + op.execute(tracingContext); + + // Verify the MD5 hash returned by server holds valid on the data received. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + verifyCheckSumForRead(buffer, op.getResult(), bufferOffset); + } + + return op; + } + /** * Delete the file or directory at specified path. * @param path to be deleted. @@ -1663,6 +1755,49 @@ AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType requestHeaders, sasTokenForReuse, abfsConfiguration); } + /** + * Creates an AbfsRestOperation with additional parameters for buffer, SAS token and request body. + * + * @param operationType The type of the operation. + * @param httpMethod The HTTP method of the operation. + * @param url The URL associated with the operation. + * @param requestHeaders The list of HTTP headers for the request. + * @param buffer The byte buffer containing data for the operation. + * @param bufferOffset The offset within the buffer where the data starts. + * @param bufferLength The length of the data within the buffer. + * @param requestBody The byte buffer containing request body. + * @param requestBodyOffset The offset within the request body where the data starts. + * @param requestBodyLength The length of the data within the request body. + * @param sasTokenForReuse The SAS token for reusing authentication. + * @return An AbfsRestOperation instance. + */ + AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType, + final String httpMethod, + final URL url, + final List requestHeaders, + final byte[] buffer, + final int bufferOffset, + final int bufferLength, + final byte[] requestBody, + final int requestBodyOffset, + final int requestBodyLength, + final String sasTokenForReuse) { + return new AbfsRestOperation( + operationType, + this, + httpMethod, + url, + requestHeaders, + buffer, + bufferOffset, + bufferLength, + requestBody, + requestBodyOffset, + requestBodyLength, + sasTokenForReuse, + abfsConfiguration); + } + @VisibleForTesting AbfsApacheHttpClient getAbfsApacheHttpClient() { return abfsApacheHttpClient; @@ -1751,6 +1886,72 @@ public abstract Hashtable getXMSProperties(AbfsHttpOperation res */ public abstract String decodeAttribute(byte[] value) throws UnsupportedEncodingException; + /** + * Get the layout of the blob. + * @param path path of the blob. + * @param tracingContext for tracing the server calls. + * @return byte array containing the layout. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public byte[] getLayout(String path, TracingContext tracingContext) throws AzureBlobFileSystemException { + AbfsRestOperation op = getLayoutOperation(path, tracingContext); + AbfsHttpOperation result = op.getResult(); + InputStream stream = result.getListResultStream(); + if (stream == null) { + return null; + } + + try { + int size = stream.available(); + byte[] buffer = new byte[size]; + int read = stream.read(buffer); + if (read != size) { + LOG.warn("Could not read all bytes from list result stream"); + } + return buffer; + } catch (IOException e) { + throw new AbfsRestOperationException( + result.getStatusCode(), + result.getStorageErrorCode(), + result.getStorageErrorMessage(), + e, + result); + } + } + + /** + * Get the layout operation of the blob. + * @param path path of the blob. + * @param tracingContext for tracing the server calls. + * @return AbfsRestOperation containing the layout. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public AbfsRestOperation getLayoutOperation(String path, TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(ApiVersion.NOV_04_2024); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_BLOB_LAYOUT, "true")); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.RANGE, "bytes=0-")); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + appendSASTokenToQuery(path, SASTokenProvider.GET_PROPERTIES_OPERATION, + abfsUriQueryBuilder); + + URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + try { + url = UriUtils.changeUrlFromDfsToBlob(url); + } catch (InvalidUriException e) { + LOG.debug("Failed to convert DFS URL to Blob URL", e); + } + + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetPathStatus, + HTTP_METHOD_GET, + url, + requestHeaders); + + op.execute(tracingContext); + return op; + } + /** * Get the dummy success operation. * @param operationType type of the operation diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 31b6f0f073940..1f7ee23f39a64 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -20,10 +20,25 @@ import java.io.EOFException; import java.io.FileNotFoundException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import org.apache.hadoop.fs.impl.CombinedFileRange; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; import java.io.IOException; +import java.io.ByteArrayOutputStream; import java.net.HttpURLConnection; import java.util.UUID; +import XFE.Proto.BlobLayout.BlobLayout; +import XFE.Proto.BlobLayout.Range; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.PositionedReadable; @@ -73,7 +88,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final AbfsClient client; private final Statistics statistics; private final String path; - private final long contentLength; + private long contentLength; private final int bufferSize; // default buffer size private final int footerReadSize; // default buffer size to read when reading footer private final int readAheadQueueDepth; // initialized in constructor @@ -133,6 +148,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, /** ABFS instance to be held by the input stream to avoid GC close. */ private final BackReference fsBackRef; private final ReadBufferManager readBufferManager; + private final byte[] layout; + private BlobLayout blobLayout; public AbfsInputStream( final AbfsClient client, @@ -177,6 +194,17 @@ public AbfsInputStream( } this.fsBackRef = abfsInputStreamContext.getFsBackRef(); contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter(); + this.layout = abfsInputStreamContext.getLayout(); + if (this.layout != null) { + try { + this.blobLayout = BlobLayout.parseFrom(this.layout); + if (this.blobLayout.getRangesCount() > 0) { + this.contentLength = this.blobLayout.getRanges(this.blobLayout.getRangesCount() - 1).getEnd() + 1; + } + } catch (IOException e) { + LOG.warn("Failed to parse blob layout", e); + } + } /* * Initialize the ReadBufferManager based on whether readAheadV2 is enabled or not. @@ -242,6 +270,64 @@ public int read(long position, byte[] buffer, int offset, int length) return bytesRead; } + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + if (!client.getAbfsConfiguration().isReadVectoredParallelEnabled()) { + super.readVectored(ranges, allocate); + return; + } + + List sortedRanges = VectoredReadUtils.validateAndSortRanges(ranges, Optional.of(contentLength)); + + for (FileRange range : sortedRanges) { + range.setData(new CompletableFuture<>()); + } + + List mergedRanges = VectoredReadUtils.mergeSortedRanges(sortedRanges, + minSeekForVectorReads(), minSeekForVectorReads(), maxReadSizeForVectorReads()); + + for (CombinedFileRange combinedRange : mergedRanges) { + client.submit(() -> { + try { + ByteBuffer buffer = allocate.apply(combinedRange.getLength()); + readRangeFrom(combinedRange, buffer); + buffer.flip(); + for (FileRange child : combinedRange.getUnderlying()) { + int start = (int) (child.getOffset() - combinedRange.getOffset()); + int end = start + child.getLength(); + ByteBuffer childBuffer = buffer.duplicate(); + childBuffer.position(start); + childBuffer.limit(end); + child.getData().complete(childBuffer.slice()); + } + } catch (Exception e) { + for (FileRange child : combinedRange.getUnderlying()) { + child.getData().completeExceptionally(e); + } + } + }); + } + } + + private void readRangeFrom(FileRange range, ByteBuffer buffer) throws IOException { + int length = range.getLength(); + byte[] b; + int offset = 0; + if (buffer.hasArray()) { + b = buffer.array(); + offset = buffer.arrayOffset() + buffer.position(); + } else { + b = new byte[length]; + } + + int bytesRead = read(range.getOffset(), b, offset, length); + + if (!buffer.hasArray()) { + buffer.put(b, 0, bytesRead); + } + } + @Override public int read() throws IOException { byte[] b = new byte[1]; @@ -583,6 +669,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t if (length > (b.length - offset)) { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } + length = (int) min(length, contentLength - position); final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { @@ -591,9 +678,61 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t } LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); tracingContext.setPosition(String.valueOf(position)); - op = client.read(path, position, b, offset, length, - tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), - contextEncryptionAdapter, tracingContext); + + byte[] dataKeys = null; + String endpoint = null; + if (blobLayout != null) { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + long requestStart = position; + long requestEnd = position + length; + + Range bestRange = null; + long maxOverlap = 0; + Set deduplicatedIndexes = new HashSet<>(); + + for (Range range : blobLayout.getRangesList()) { + long rangeStart = range.getStart(); + long rangeEnd = range.getEnd(); + + long overlapStart = max(requestStart, rangeStart); + long overlapEnd = min(requestEnd, rangeEnd); + long overlap = max(0, overlapEnd - overlapStart); + + if (overlap > 0) { + for (int index : range.getReadKeyIndexesList()) { + if (deduplicatedIndexes.add(index)) { + blobLayout.getReadKeys(index).writeTo(outputStream); + } + } + } + + if (overlap > maxOverlap) { + maxOverlap = overlap; + bestRange = range; + } + } + if (bestRange != null) { + int endpointIndex = bestRange.getEndpointIndex(); + if (endpointIndex < blobLayout.getEndpointsCount()) { + endpoint = blobLayout.getEndpoints(endpointIndex); + } + } + + if (outputStream.size() > 0) { + dataKeys = outputStream.toByteArray(); + } + } + } + + if (dataKeys != null) { + op = client.readWithLayout(path, position, b, offset, length, + tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), + contextEncryptionAdapter, tracingContext, dataKeys, null); + } else { + op = client.read(path, position, b, offset, length, + tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), + contextEncryptionAdapter, tracingContext); + } cachedSasToken.update(op.getSasToken()); LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index cb51fa22900e4..d6bee076fdf13 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -64,6 +64,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private ContextEncryptionAdapter contextEncryptionAdapter = null; + private byte[] layout; + /** * Constructs a new {@link AbfsInputStreamContext}. * @@ -254,6 +256,17 @@ public AbfsInputStreamContext withEncryptionAdapter( return this; } + /** + * Sets the layout. + * + * @param layout the layout. + * @return this instance. + */ + public AbfsInputStreamContext withLayout(final byte[] layout) { + this.layout = layout; + return this; + } + /** * Finalizes and validates the context configuration. *

@@ -351,4 +364,9 @@ public BackReference getFsBackRef() { public ContextEncryptionAdapter getEncryptionAdapter() { return contextEncryptionAdapter; } + + /** @return the layout. */ + public byte[] getLayout() { + return layout; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 1b55084fb4571..32b89e27309bc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -104,6 +104,12 @@ public class AbfsRestOperation { private byte[] buffer; private int bufferOffset; private int bufferLength; + + // For operations that have both request and response body (e.g. readWithLayout) + private byte[] requestBody; + private int requestBodyOffset; + private int requestBodyLength; + private int retryCount = 0; private boolean isThrottledRequest = false; private long maxRetryCount = 0L; @@ -280,6 +286,42 @@ String getSasToken() { this.bufferLength = bufferLength; } + /** + * Initializes a new REST operation. + * + * @param operationType The type of the REST operation (Append, ReadFile, etc). + * @param client The Blob FS client. + * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). + * @param url The full URL including query string parameters. + * @param requestHeaders The HTTP request headers. + * @param buffer For uploads, this is the request entity body. For downloads, + * this will hold the response entity body. + * @param bufferOffset An offset into the buffer where the data begins. + * @param bufferLength The length of the data in the buffer. + * @param requestBody For operations that have both request and response body. + * @param requestBodyOffset An offset into the requestBody where the data begins. + * @param requestBodyLength The length of the data in the requestBody. + * @param sasToken A sasToken for optional re-use by AbfsInputStream/AbfsOutputStream. + */ + AbfsRestOperation(AbfsRestOperationType operationType, + AbfsClient client, + String method, + URL url, + List requestHeaders, + byte[] buffer, + int bufferOffset, + int bufferLength, + byte[] requestBody, + int requestBodyOffset, + int requestBodyLength, + String sasToken, + final AbfsConfiguration abfsConfiguration) { + this(operationType, client, method, url, requestHeaders, buffer, bufferOffset, bufferLength, sasToken, abfsConfiguration); + this.requestBody = requestBody; + this.requestBodyOffset = requestBodyOffset; + this.requestBodyLength = requestBodyLength; + } + /** * Execute a AbfsRestOperation. Track the Duration of a request if * abfsCounters isn't null. @@ -416,7 +458,7 @@ private boolean executeHttpOperation(final int retryCount, incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); tracingContext.constructHeader(httpOperation, failureReason, retryPolicy.getAbbreviation()); - signRequest(httpOperation, hasRequestBody ? bufferLength : 0, tracingContext.isMetricCall()); + signRequest(httpOperation, hasRequestBody ? (requestBody != null ? requestBodyLength : bufferLength) : 0, tracingContext.isMetricCall()); } catch (IOException e) { LOG.debug("Auth failure: {}, {}", method, url); @@ -430,10 +472,16 @@ private boolean executeHttpOperation(final int retryCount, httpOperation.getRequestProperties()); intercept.sendingRequest(operationType, abfsCounters); if (hasRequestBody) { - httpOperation.sendPayload(buffer, bufferOffset, bufferLength); - incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); - if (!(operationType.name().equals(PUT_BLOCK_LIST))) { - incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); + if (requestBody != null) { + httpOperation.sendPayload(requestBody, requestBodyOffset, requestBodyLength); + incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); + incrementCounter(AbfsStatistic.BYTES_SENT, requestBodyLength); + } else { + httpOperation.sendPayload(buffer, bufferOffset, bufferLength); + incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); + if (!(operationType.name().equals(PUT_BLOCK_LIST))) { + incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); + } } } httpOperation.processResponse(buffer, bufferOffset, bufferLength); diff --git a/hadoop-tools/hadoop-azure/src/main/proto/BlobLayout.proto b/hadoop-tools/hadoop-azure/src/main/proto/BlobLayout.proto new file mode 100644 index 0000000000000..95f7bcf7ceda4 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/proto/BlobLayout.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +option java_multiple_files = true; +package XFE.Proto.BlobLayout; + + +message Range +{ + uint64 Start = 1; + uint64 End = 2; + uint32 EndpointIndex = 3; + repeated uint32 ReadKeyIndexes = 4; +} + +message BlobLayout +{ + string Id = 1; + uint64 Expiry = 2; + repeated bytes ReadKeys = 3; + repeated string Endpoints = 4; + repeated Range Ranges = 5; +}