Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_GET_LAYOUT_ON_OPEN;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READ_VECTORED_PARALLEL;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_ENABLE_GET_LAYOUT_ON_OPEN;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_ENABLE_READ_VECTORED_PARALLEL;

/**
* Configuration for Azure Blob FileSystem.
Expand Down Expand Up @@ -303,6 +307,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ALWAYS_READ_BUFFER_SIZE)
private boolean alwaysReadBufferSize;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_GET_LAYOUT_ON_OPEN,
DefaultValue = DEFAULT_ENABLE_GET_LAYOUT_ON_OPEN)
private boolean enableGetLayoutOnOpen;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
DefaultValue = DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
Expand Down Expand Up @@ -490,6 +498,8 @@ public class AbfsConfiguration{
private long sasTokenRenewPeriodForStreamsInSeconds;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_READ_VECTORED_PARALLEL, DefaultValue = DEFAULT_ENABLE_READ_VECTORED_PARALLEL)
private boolean enableReadVectoredParallel; @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
private boolean enableAbfsListIterator;

Expand Down Expand Up @@ -1218,6 +1228,10 @@ public long getSasTokenRenewPeriodForStreamsInSeconds() {
return this.sasTokenRenewPeriodForStreamsInSeconds;
}

public boolean isReadVectoredParallelEnabled() {
return this.enableReadVectoredParallel;
}

public String getAzureBlockLocationHost() {
return this.azureBlockLocationHost;
}
Expand Down Expand Up @@ -1298,6 +1312,10 @@ public boolean shouldReadBufferSizeAlways() {
return this.alwaysReadBufferSize;
}

public boolean isGetLayoutOnOpenEnabled() {
return this.enableGetLayoutOnOpen;
}

public boolean isFlushEnabled() {
return this.enableFlush;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -890,13 +891,14 @@ public AbfsInputStream openFileForRead(Path path,
long contentLength;
ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
/*
* GetPathStatus API has to be called in case of:
* 1. fileStatus is null or not an object of VersionedFileStatus: as eTag
* would not be there in the fileStatus object.
* 2. fileStatus is an object of VersionedFileStatus and the object doesn't
* have encryptionContext field when client's encryptionType is
* ENCRYPTION_CONTEXT.
*/
* GetPathStatus API has to be called in case of:
* 1. fileStatus is null or not an object of VersionedFileStatus: as eTag
* would not be there in the fileStatus object.
* 2. fileStatus is an object of VersionedFileStatus and the object doesn't
* have encryptionContext field when client's encryptionType is
* ENCRYPTION_CONTEXT.
*/
byte[] layout = null;
if ((fileStatus instanceof VersionedFileStatus) && (
getClient().getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT
|| ((VersionedFileStatus) fileStatus).getEncryptionContext()
Expand All @@ -917,9 +919,24 @@ public AbfsInputStream openFileForRead(Path path,
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
AbfsHttpOperation op = getClient().getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE;
AbfsHttpOperation op;
if (abfsConfiguration.isGetLayoutOnOpenEnabled()) {
AbfsRestOperation restOp = getClient().getLayoutOperation(relativePath, tracingContext);
op = restOp.getResult();
InputStream stream = op.getListResultStream();
if (stream != null) {
int size = stream.available();
layout = new byte[size];
int read = stream.read(layout);
if (read != size) {
LOG.warn("Could not read all bytes from list result stream");
}
}
} else {
op = getClient().getPathStatus(relativePath, false,
tracingContext, null).getResult();
}
resourceType = /*getClient().checkIsDir(op) ? DIRECTORY :*/ FILE;
contentLength = extractContentLength(op);
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
/*
Expand Down Expand Up @@ -954,7 +971,7 @@ public AbfsInputStream openFileForRead(Path path,
return new AbfsInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
contextEncryptionAdapter).withLayout(layout),
eTag, tracingContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public final class AbfsHttpConstants {
public enum ApiVersion {

DEC_12_2019("2019-12-12"),
FEB_10_2020("2020-02-10"),
APR_10_2021("2021-04-10"),
AUG_03_2023("2023-08-03"),
NOV_04_2024("2024-11-04"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
/** Enable issuing GetLayout requests when files are opened to retrieve blob layout metadata. */
public static final String FS_AZURE_ENABLE_GET_LAYOUT_ON_OPEN = "fs.azure.enable.getLayoutOnOpen";
/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
Expand Down Expand Up @@ -485,6 +487,8 @@ public static String containerProperty(String property, String fsName, String ac
/** Key for SAS token provider: {@value}. **/
public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = "fs.azure.sas.token.provider.type";

public static final String FS_AZURE_ENABLE_READ_VECTORED_PARALLEL = "fs.azure.enable.read.vectored.parallel";

/** For performance, AbfsInputStream/AbfsOutputStream re-use SAS tokens until the expiry is within this number of seconds. **/
public static final String FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS = "fs.azure.sas.token.renew.period.for.streams";

Expand Down Expand Up @@ -546,7 +550,7 @@ public static String containerProperty(String property, String fsName, String ac
public static final String FS_AZURE_BLOB_COPY_MAX_WAIT_MILLIS = "fs.azure.blob.copy.max.wait.millis";
/**Blob rename lease refresh duration: {@value}*/
public static final String FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION
= "fs.azure.blob.atomic.rename.lease.refresh.duration";
= "fs.azure.blob.atomic.rename.lease.refresh.duration";
/**Maximum number of blob information enqueued in memory for rename or delete orchestration: {@value}*/
public static final String FS_AZURE_PRODUCER_QUEUE_MAX_SIZE = "fs.azure.blob.dir.list.producer.queue.max.size";
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
public static final boolean DEFAULT_ENABLE_GET_LAYOUT_ON_OPEN = false;
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
Expand Down Expand Up @@ -157,6 +158,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_CHECK_ACCESS = true;
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
public static final boolean DEFAULT_ENABLE_READ_VECTORED_PARALLEL = false;

public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
Expand Down Expand Up @@ -240,7 +242,7 @@ public final class FileSystemConfigurations {
* Default buffer option: {@value}.
*/
public static final String DATA_BLOCKS_BUFFER_DEFAULT =
DATA_BLOCKS_BYTEBUFFER;
DATA_BLOCKS_BYTEBUFFER;

/**
* IO rate limit. Value: {@value}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,7 @@ public final class HttpHeaderConfigurations {
*/
public static final String X_MS_CLIENT_TRANSACTION_ID = "x-ms-client-transaction-id";

public static final String X_MS_BLOB_LAYOUT = "x-ms-blob-layout";

private HttpHeaderConfigurations() {}
}
Loading