From a155f85cb66ceba96fcec42c16d0ca8ba8b77a6f Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Thu, 22 Jan 2026 18:22:25 +0530 Subject: [PATCH 01/17] [ENG-35624]: Add azure storage support for metadata extractor --- lakeview/build.gradle | 4 + .../main/java/ai/onehouse/RuntimeModule.java | 8 + .../config/models/common/AzureConfig.java | 31 +++ .../common/FileSystemConfiguration.java | 1 + .../onehouse/constants/StorageConstants.java | 11 +- .../storage/AzureAsyncStorageClient.java | 211 ++++++++++++++++++ .../ai/onehouse/storage/StorageUtils.java | 26 ++- .../providers/AzureStorageClientProvider.java | 116 ++++++++++ .../java/ai/onehouse/TestRuntimeModule.java | 5 + 9 files changed, 404 insertions(+), 9 deletions(-) create mode 100644 lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java create mode 100644 lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java create mode 100644 lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java diff --git a/lakeview/build.gradle b/lakeview/build.gradle index fe50b6ed..06d6633c 100644 --- a/lakeview/build.gradle +++ b/lakeview/build.gradle @@ -61,6 +61,10 @@ dependencies { exclude group: "com.google.protobuf", module: "protobuf-java-utils" } + implementation platform('com.azure:azure-sdk-bom:1.2.25') + implementation 'com.azure:azure-storage-blob' + implementation 'com.azure:azure-identity' + implementation 'org.springframework:spring-beans:5.3.23' testImplementation "org.mockito:mockito-core:${versions.mockito}" diff --git a/lakeview/src/main/java/ai/onehouse/RuntimeModule.java b/lakeview/src/main/java/ai/onehouse/RuntimeModule.java index f5b5bd84..960232b8 100644 --- a/lakeview/src/main/java/ai/onehouse/RuntimeModule.java +++ b/lakeview/src/main/java/ai/onehouse/RuntimeModule.java @@ -10,9 +10,11 @@ import ai.onehouse.config.ConfigProvider; import ai.onehouse.config.models.common.FileSystemConfiguration; import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.AzureAsyncStorageClient; import ai.onehouse.storage.GCSAsyncStorageClient; import ai.onehouse.storage.S3AsyncStorageClient; import ai.onehouse.storage.StorageUtils; +import ai.onehouse.storage.providers.AzureStorageClientProvider; import ai.onehouse.storage.providers.GcsClientProvider; import ai.onehouse.storage.providers.S3AsyncClientProvider; @@ -128,10 +130,13 @@ static AsyncStorageClient providesAsyncStorageClientForDiscovery( StorageUtils storageUtils, @TableDiscoveryS3ObjectStorageClient S3AsyncClientProvider s3AsyncClientProvider, GcsClientProvider gcsClientProvider, + AzureStorageClientProvider azureStorageClientProvider, ExecutorService executorService) { FileSystemConfiguration fileSystemConfiguration = config.getFileSystemConfiguration(); if (fileSystemConfiguration.getS3Config() != null) { return new S3AsyncStorageClient(s3AsyncClientProvider, storageUtils, executorService); + } else if (fileSystemConfiguration.getAzureConfig() != null) { + return new AzureAsyncStorageClient(azureStorageClientProvider, storageUtils, executorService); } else { return new GCSAsyncStorageClient(gcsClientProvider, storageUtils, executorService); } @@ -145,10 +150,13 @@ static AsyncStorageClient providesAsyncStorageClientForUpload( StorageUtils storageUtils, @TableMetadataUploadS3ObjectStorageClient S3AsyncClientProvider s3AsyncClientProvider, GcsClientProvider gcsClientProvider, + AzureStorageClientProvider azureStorageClientProvider, ExecutorService executorService) { FileSystemConfiguration fileSystemConfiguration = config.getFileSystemConfiguration(); if (fileSystemConfiguration.getS3Config() != null) { return new S3AsyncStorageClient(s3AsyncClientProvider, storageUtils, executorService); + } else if (fileSystemConfiguration.getAzureConfig() != null) { + return new AzureAsyncStorageClient(azureStorageClientProvider, storageUtils, executorService); } else { return new GCSAsyncStorageClient(gcsClientProvider, storageUtils, executorService); } diff --git a/lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java b/lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java new file mode 100644 index 00000000..4ba1e31d --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java @@ -0,0 +1,31 @@ +package ai.onehouse.config.models.common; + +import java.util.Optional; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.extern.jackson.Jacksonized; + +@Builder +@Getter +@Jacksonized +@EqualsAndHashCode +public class AzureConfig { + @NonNull private String accountName; + + // Optional authentication methods + + // Option 1: Account Key (for dev/testing, never expires) + @Builder.Default private Optional accountKey = Optional.empty(); + + // Option 2: Connection String (alternative to account key) + @Builder.Default private Optional connectionString = Optional.empty(); + + // Option 3: Managed Identity or Service Principal + @Builder.Default private Optional tenantId = Optional.empty(); + @Builder.Default private Optional clientId = Optional.empty(); + + // For Service Principal authentication (Pull Model support) + @Builder.Default private Optional clientSecret = Optional.empty(); +} diff --git a/lakeview/src/main/java/ai/onehouse/config/models/common/FileSystemConfiguration.java b/lakeview/src/main/java/ai/onehouse/config/models/common/FileSystemConfiguration.java index b681385c..26799b48 100644 --- a/lakeview/src/main/java/ai/onehouse/config/models/common/FileSystemConfiguration.java +++ b/lakeview/src/main/java/ai/onehouse/config/models/common/FileSystemConfiguration.java @@ -12,4 +12,5 @@ public class FileSystemConfiguration { private S3Config s3Config; private GCSConfig gcsConfig; + private AzureConfig azureConfig; } diff --git a/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java b/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java index e703754f..4ee07c99 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java @@ -6,9 +6,16 @@ public class StorageConstants { private StorageConstants() {} // typical s3 path: "s3://bucket-name/path/to/object" - // gcs path format "gs:// [bucket] /path/to/file" + // gcs path format: "gs://bucket/path/to/file" + // azure blob format: "https://account.blob.core.windows.net/container/path/to/blob" + // azure adls gen2 format: "https://account.dfs.core.windows.net/container/path/to/file" public static final Pattern OBJECT_STORAGE_URI_PATTERN = - Pattern.compile("^(s3://|gs://)([^/]+)(/.*)?"); + Pattern.compile("^(?:(s3://|gs://)|https://[^.]+\\.(?:blob|dfs)\\.core\\.windows\\.net/)([^/]+)(/.*)?$"); + + // Azure-specific pattern to extract account name from URI + // Group 1: account name, Group 2: container name, Group 3: blob path + public static final Pattern AZURE_STORAGE_URI_PATTERN = + Pattern.compile("^https://([^.]+)\\.(?:blob|dfs)\\.core\\.windows\\.net/([^/]+)(/.*)?$"); // https://cloud.google.com/compute/docs/naming-resources#resource-name-format public static final String GCP_RESOURCE_NAME_FORMAT = "^[a-z]([-a-z0-9]*[a-z0-9])$"; diff --git a/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java b/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java new file mode 100644 index 00000000..07f93a58 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java @@ -0,0 +1,211 @@ +package ai.onehouse.storage; + +import ai.onehouse.exceptions.AccessDeniedException; +import ai.onehouse.exceptions.NoSuchKeyException; +import ai.onehouse.exceptions.ObjectStorageClientException; +import ai.onehouse.exceptions.RateLimitException; +import ai.onehouse.storage.models.File; +import ai.onehouse.storage.models.FileStreamData; +import ai.onehouse.storage.providers.AzureStorageClientProvider; +import com.azure.core.http.rest.PagedFlux; +import com.azure.core.http.rest.PagedResponse; +import com.azure.core.util.BinaryData; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.BlobServiceAsyncClient; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.ListBlobsOptions; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import java.io.ByteArrayInputStream; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; + +@Slf4j +public class AzureAsyncStorageClient extends AbstractAsyncStorageClient { + private final AzureStorageClientProvider azureStorageClientProvider; + + @Inject + public AzureAsyncStorageClient( + @Nonnull AzureStorageClientProvider azureStorageClientProvider, + @Nonnull StorageUtils storageUtils, + @Nonnull ExecutorService executorService) { + super(executorService, storageUtils); + this.azureStorageClientProvider = azureStorageClientProvider; + } + + @Override + public CompletableFuture>> fetchObjectsByPage( + String containerName, String prefix, String continuationToken, String startAfter) { + + log.debug( + "fetching files in container {} with prefix {} continuationToken {} startAfter {}", + containerName, + prefix, + continuationToken, + startAfter); + + return CompletableFuture.supplyAsync( + () -> { + try { + BlobServiceAsyncClient blobServiceClient = + azureStorageClientProvider.getAzureAsyncClient(); + BlobContainerAsyncClient containerClient = + blobServiceClient.getBlobContainerAsyncClient(containerName); + + ListBlobsOptions options = new ListBlobsOptions(); + if (StringUtils.isNotBlank(prefix)) { + options.setPrefix(prefix); + } + + PagedFlux pagedFlux = containerClient.listBlobsByHierarchy("/", options); + + List files = new ArrayList<>(); + String nextContinuationToken = null; + + // Get single page with continuation token + PagedResponse page = + StringUtils.isNotBlank(continuationToken) + ? pagedFlux.byPage(continuationToken).blockFirst() + : pagedFlux.byPage().blockFirst(); + + if (page != null) { + // Process items in the page + page.getElements() + .forEach( + blobItem -> { + String blobName = blobItem.getName(); + boolean isDirectory = blobItem.isPrefix() != null && blobItem.isPrefix(); + String fileName = blobName.replaceFirst("^" + prefix, ""); + + files.add( + File.builder() + .filename(fileName) + .lastModifiedAt( + isDirectory + ? Instant.EPOCH + : blobItem.getProperties().getLastModified().toInstant()) + .isDirectory(isDirectory) + .build()); + }); + + // Get continuation token for next page + nextContinuationToken = page.getContinuationToken(); + } + + return Pair.of(nextContinuationToken, files); + } catch (Exception ex) { + log.error("Failed to fetch objects by page", ex); + throw clientException(ex, "fetchObjectsByPage", containerName); + } + }, + executorService); + } + + @VisibleForTesting + CompletableFuture readBlob(String azureUri) { + log.debug("Reading Azure blob: {}", azureUri); + return CompletableFuture.supplyAsync( + () -> { + try { + BlobAsyncClient blobClient = getBlobClient(azureUri); + return blobClient.downloadContent().block(); + } catch (Exception ex) { + log.error("Failed to read blob", ex); + throw clientException(ex, "readBlob", azureUri); + } + }, + executorService); + } + + @Override + public CompletableFuture streamFileAsync(String azureUri) { + return readBlob(azureUri) + .thenApply( + binaryData -> + FileStreamData.builder() + .inputStream(new ByteArrayInputStream(binaryData.toBytes())) + .fileSize((long) binaryData.toBytes().length) + .build()); + } + + @Override + public CompletableFuture readFileAsBytes(String azureUri) { + return readBlob(azureUri).thenApply(BinaryData::toBytes); + } + + private BlobAsyncClient getBlobClient(String azureUri) { + String containerName = storageUtils.getBucketNameFromUri(azureUri); + String blobPath = storageUtils.getPathFromUrl(azureUri); + + BlobServiceAsyncClient blobServiceClient = azureStorageClientProvider.getAzureAsyncClient(); + BlobContainerAsyncClient containerClient = + blobServiceClient.getBlobContainerAsyncClient(containerName); + return containerClient.getBlobAsyncClient(blobPath); + } + + @Override + protected RuntimeException clientException(Throwable ex, String operation, String path) { + Throwable wrappedException = ex.getCause() != null ? ex.getCause() : ex; + + if (wrappedException instanceof BlobStorageException) { + BlobStorageException blobException = (BlobStorageException) wrappedException; + BlobErrorCode errorCode = blobException.getErrorCode(); + int statusCode = blobException.getStatusCode(); + + log.error( + "Error in Azure operation: {} on path: {} code: {} status: {} message: {}", + operation, + path, + errorCode, + statusCode, + blobException.getMessage()); + + // Map to AccessDeniedException + if (statusCode == 403 || statusCode == 401) { + return new AccessDeniedException( + String.format( + "AccessDenied for operation: %s on path: %s with message: %s", + operation, path, blobException.getMessage())); + } + + // Map to NoSuchKeyException + if (errorCode == BlobErrorCode.BLOB_NOT_FOUND + || errorCode == BlobErrorCode.CONTAINER_NOT_FOUND) { + return new NoSuchKeyException( + String.format("NoSuchKey for operation: %s on path: %s", operation, path)); + } + + // Map to RateLimitException + if (statusCode == 429 || statusCode == 503) { + return new RateLimitException( + String.format("Throttled by Azure for operation: %s on path: %s", operation, path)); + } + } else if (wrappedException instanceof AccessDeniedException + || wrappedException instanceof NoSuchKeyException + || wrappedException instanceof RateLimitException) { + return (RuntimeException) wrappedException; + } + + return new ObjectStorageClientException(ex); + } + + @Override + public void refreshClient() { + azureStorageClientProvider.refreshClient(); + } + + @Override + public void initializeClient() { + azureStorageClientProvider.getAzureAsyncClient(); + } +} diff --git a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java index ccd91ba4..18c6f8e7 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java +++ b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java @@ -1,5 +1,6 @@ package ai.onehouse.storage; +import static ai.onehouse.constants.StorageConstants.AZURE_STORAGE_URI_PATTERN; import static ai.onehouse.constants.StorageConstants.OBJECT_STORAGE_URI_PATTERN; import java.util.regex.Matcher; @@ -13,15 +14,14 @@ public String getPathFromUrl(String uri) { throw new IllegalArgumentException(INVALID_STORAGE_URI_ERROR_MSG + uri); } - String prefix = ""; - - // Remove the scheme and bucket name from the S3 path - int startIndex = uri.indexOf('/', 5); // Skip 's3://' and 'gs://' - if (startIndex != -1) { - prefix = uri.substring(startIndex + 1); + Matcher matcher = OBJECT_STORAGE_URI_PATTERN.matcher(uri); + if (matcher.matches()) { + String path = matcher.group(3); + // Remove leading slash if present + return (path != null && path.startsWith("/")) ? path.substring(1) : (path != null ? path : ""); } - return prefix; + return ""; } public String constructFileUri(String directoryUri, String filePath) { @@ -40,4 +40,16 @@ public String getBucketNameFromUri(String uri) { } throw new IllegalArgumentException(INVALID_STORAGE_URI_ERROR_MSG + uri); } + + public String getAccountNameFromAzureUri(String uri) { + Matcher matcher = AZURE_STORAGE_URI_PATTERN.matcher(uri); + if (matcher.matches()) { + return matcher.group(1); + } + throw new IllegalArgumentException("Invalid Azure storage Uri: " + uri); + } + + public boolean isAzureUri(String uri) { + return AZURE_STORAGE_URI_PATTERN.matcher(uri).matches(); + } } diff --git a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java new file mode 100644 index 00000000..3cc5f1d8 --- /dev/null +++ b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java @@ -0,0 +1,116 @@ +package ai.onehouse.storage.providers; + +import ai.onehouse.config.Config; +import ai.onehouse.config.models.common.AzureConfig; +import ai.onehouse.config.models.common.FileSystemConfiguration; +import com.azure.identity.ClientSecretCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.DefaultAzureCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobServiceAsyncClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import javax.annotation.Nonnull; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AzureStorageClientProvider { + private final AzureConfig azureConfig; + private static BlobServiceAsyncClient azureAsyncClient; + private static final Logger logger = LoggerFactory.getLogger(AzureStorageClientProvider.class); + + @Inject + public AzureStorageClientProvider(@Nonnull Config config) { + FileSystemConfiguration fileSystemConfiguration = config.getFileSystemConfiguration(); + this.azureConfig = fileSystemConfiguration.getAzureConfig(); + } + + @VisibleForTesting + protected BlobServiceAsyncClient createAzureAsyncClient() { + logger.debug("Instantiating Azure Blob Storage client"); + validateAzureConfig(azureConfig); + + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + String endpoint = String.format("https://%s.blob.core.windows.net", azureConfig.getAccountName()); + builder.endpoint(endpoint); + + // Option 1: Connection String (includes account key and endpoint) + if (azureConfig.getConnectionString().isPresent()) { + logger.debug("Using connection string for authentication"); + builder.connectionString(azureConfig.getConnectionString().get()); + return builder.buildAsyncClient(); + } + + // Option 2: Account Key (shared key credential) + if (azureConfig.getAccountKey().isPresent()) { + logger.debug("Using account key for authentication"); + StorageSharedKeyCredential credential = + new StorageSharedKeyCredential( + azureConfig.getAccountName(), azureConfig.getAccountKey().get()); + builder.credential(credential); + return builder.buildAsyncClient(); + } + + // Option 3: Service Principal (client secret credential) + if (azureConfig.getTenantId().isPresent() + && azureConfig.getClientId().isPresent() + && azureConfig.getClientSecret().isPresent()) { + logger.debug("Using service principal (client secret) for authentication"); + ClientSecretCredential credential = + new ClientSecretCredentialBuilder() + .tenantId(azureConfig.getTenantId().get()) + .clientId(azureConfig.getClientId().get()) + .clientSecret(azureConfig.getClientSecret().get()) + .build(); + builder.credential(credential); + return builder.buildAsyncClient(); + } + + // Option 4: Managed Identity (tenantId + clientId, no secret) + if (azureConfig.getTenantId().isPresent() && azureConfig.getClientId().isPresent()) { + logger.debug("Using managed identity for authentication"); + DefaultAzureCredential credential = + new DefaultAzureCredentialBuilder() + .tenantId(azureConfig.getTenantId().get()) + .managedIdentityClientId(azureConfig.getClientId().get()) + .build(); + builder.credential(credential); + return builder.buildAsyncClient(); + } + + // Option 5: Default Azure Credential (fallback to environment-based auth) + logger.debug("Using default Azure credential chain for authentication"); + DefaultAzureCredential credential = new DefaultAzureCredentialBuilder().build(); + builder.credential(credential); + return builder.buildAsyncClient(); + } + + public BlobServiceAsyncClient getAzureAsyncClient() { + if (azureAsyncClient == null) { + azureAsyncClient = createAzureAsyncClient(); + } + return azureAsyncClient; + } + + public void refreshClient() { + azureAsyncClient = createAzureAsyncClient(); + } + + private void validateAzureConfig(AzureConfig azureConfig) { + if (azureConfig == null) { + throw new IllegalArgumentException("Azure Config not found"); + } + + if (StringUtils.isBlank(azureConfig.getAccountName())) { + throw new IllegalArgumentException("Azure storage account name cannot be empty"); + } + } + + @VisibleForTesting + static void resetAzureAsyncClient() { + azureAsyncClient = null; + } +} diff --git a/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java b/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java index bb5aa2a6..5c5430ba 100644 --- a/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java +++ b/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java @@ -19,10 +19,12 @@ import ai.onehouse.metadata_extractor.TableDiscoveryService; import ai.onehouse.metadata_extractor.TimelineCommitInstantsUploader; import ai.onehouse.storage.AsyncStorageClient; +import ai.onehouse.storage.AzureAsyncStorageClient; import ai.onehouse.storage.GCSAsyncStorageClient; import ai.onehouse.storage.PresignedUrlFileUploader; import ai.onehouse.storage.S3AsyncStorageClient; import ai.onehouse.storage.StorageUtils; +import ai.onehouse.storage.providers.AzureStorageClientProvider; import ai.onehouse.storage.providers.GcsClientProvider; import ai.onehouse.storage.providers.S3AsyncClientProvider; import com.google.inject.AbstractModule; @@ -109,6 +111,7 @@ void testProvidesAsyncStorageClient(FileSystem fileSystemType) { FileSystemConfiguration mockFileSystemConfiguration = mock(FileSystemConfiguration.class); S3AsyncClientProvider mockS3AsyncClientProvider = mock(S3AsyncClientProvider.class); GcsClientProvider mockGcsClientProvider = mock(GcsClientProvider.class); + AzureStorageClientProvider mockAzureStorageClientProvider = mock(AzureStorageClientProvider.class); when(mockConfig.getFileSystemConfiguration()).thenReturn(mockFileSystemConfiguration); @@ -128,6 +131,7 @@ void testProvidesAsyncStorageClient(FileSystem fileSystemType) { mockStorageUtils, mockS3AsyncClientProvider, mockGcsClientProvider, + mockAzureStorageClientProvider, mockExecutorService); if (FileSystem.S3.equals(fileSystemType)) { assertInstanceOf(S3AsyncStorageClient.class, asyncStorageClientForDiscovery); @@ -141,6 +145,7 @@ void testProvidesAsyncStorageClient(FileSystem fileSystemType) { mockStorageUtils, mockS3AsyncClientProvider, mockGcsClientProvider, + mockAzureStorageClientProvider, mockExecutorService); if (FileSystem.S3.equals(fileSystemType)) { Assertions.assertInstanceOf(S3AsyncStorageClient.class, asyncStorageClientForUpload); From 5891e31975fafdff4112d99fdc50ee4543dc0609 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Thu, 22 Jan 2026 18:51:19 +0530 Subject: [PATCH 02/17] Fix code --- .../java/ai/onehouse/storage/StorageUtils.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java index 18c6f8e7..88e497d9 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java +++ b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java @@ -1,5 +1,7 @@ package ai.onehouse.storage; +import org.apache.commons.lang3.StringUtils; + import static ai.onehouse.constants.StorageConstants.AZURE_STORAGE_URI_PATTERN; import static ai.onehouse.constants.StorageConstants.OBJECT_STORAGE_URI_PATTERN; @@ -10,18 +12,13 @@ public class StorageUtils { public String getPathFromUrl(String uri) { - if (!OBJECT_STORAGE_URI_PATTERN.matcher(uri).matches()) { - throw new IllegalArgumentException(INVALID_STORAGE_URI_ERROR_MSG + uri); - } - Matcher matcher = OBJECT_STORAGE_URI_PATTERN.matcher(uri); - if (matcher.matches()) { - String path = matcher.group(3); - // Remove leading slash if present - return (path != null && path.startsWith("/")) ? path.substring(1) : (path != null ? path : ""); + if (!matcher.matches()) { + throw new IllegalArgumentException(INVALID_STORAGE_URI_ERROR_MSG + uri); } - return ""; + String path = matcher.group(3); + return path == null ? StringUtils.EMPTY : path.replaceFirst("^/", ""); } public String constructFileUri(String directoryUri, String filePath) { From 04a5dc358442e332688765591e1ff07f9a6e126d Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Thu, 22 Jan 2026 18:56:19 +0530 Subject: [PATCH 03/17] Remove unwanted changes --- .../ai/onehouse/constants/StorageConstants.java | 15 ++++++--------- .../java/ai/onehouse/storage/StorageUtils.java | 13 ------------- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java b/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java index 4ee07c99..33f373fe 100644 --- a/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java +++ b/lakeview/src/main/java/ai/onehouse/constants/StorageConstants.java @@ -5,18 +5,15 @@ public class StorageConstants { private StorageConstants() {} - // typical s3 path: "s3://bucket-name/path/to/object" - // gcs path format: "gs://bucket/path/to/file" - // azure blob format: "https://account.blob.core.windows.net/container/path/to/blob" - // azure adls gen2 format: "https://account.dfs.core.windows.net/container/path/to/file" + /* + * typical s3 path: "s3://bucket-name/path/to/object" + * gcs path format: "gs://bucket/path/to/file" + * azure blob format: "https://account.blob.core.windows.net/container/path/to/blob" + * azure adls gen2 format: "https://account.dfs.core.windows.net/container/path/to/file" + */ public static final Pattern OBJECT_STORAGE_URI_PATTERN = Pattern.compile("^(?:(s3://|gs://)|https://[^.]+\\.(?:blob|dfs)\\.core\\.windows\\.net/)([^/]+)(/.*)?$"); - // Azure-specific pattern to extract account name from URI - // Group 1: account name, Group 2: container name, Group 3: blob path - public static final Pattern AZURE_STORAGE_URI_PATTERN = - Pattern.compile("^https://([^.]+)\\.(?:blob|dfs)\\.core\\.windows\\.net/([^/]+)(/.*)?$"); - // https://cloud.google.com/compute/docs/naming-resources#resource-name-format public static final String GCP_RESOURCE_NAME_FORMAT = "^[a-z]([-a-z0-9]*[a-z0-9])$"; } diff --git a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java index 88e497d9..cad5c08f 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java +++ b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java @@ -2,7 +2,6 @@ import org.apache.commons.lang3.StringUtils; -import static ai.onehouse.constants.StorageConstants.AZURE_STORAGE_URI_PATTERN; import static ai.onehouse.constants.StorageConstants.OBJECT_STORAGE_URI_PATTERN; import java.util.regex.Matcher; @@ -37,16 +36,4 @@ public String getBucketNameFromUri(String uri) { } throw new IllegalArgumentException(INVALID_STORAGE_URI_ERROR_MSG + uri); } - - public String getAccountNameFromAzureUri(String uri) { - Matcher matcher = AZURE_STORAGE_URI_PATTERN.matcher(uri); - if (matcher.matches()) { - return matcher.group(1); - } - throw new IllegalArgumentException("Invalid Azure storage Uri: " + uri); - } - - public boolean isAzureUri(String uri) { - return AZURE_STORAGE_URI_PATTERN.matcher(uri).matches(); - } } From d5d8124e4c871162c255ebd4d207b8040e115140 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Thu, 22 Jan 2026 19:13:33 +0530 Subject: [PATCH 04/17] Add comment --- .../src/main/java/ai/onehouse/storage/StorageUtils.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java index cad5c08f..c51ef8ff 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java +++ b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java @@ -16,6 +16,14 @@ public String getPathFromUrl(String uri) { throw new IllegalArgumentException(INVALID_STORAGE_URI_ERROR_MSG + uri); } + /* + * Group 3 extracts the path portion after the bucket/container name from the URI. + * Examples: + * s3://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt + * gs://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt + * https://account.blob.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt + * https://account.dfs.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt + */ String path = matcher.group(3); return path == null ? StringUtils.EMPTY : path.replaceFirst("^/", ""); } From 97191925790ec0719d44892b3ed51c6ab13141d8 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Thu, 22 Jan 2026 19:21:13 +0530 Subject: [PATCH 05/17] Add comments --- .../ai/onehouse/storage/StorageUtils.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java index c51ef8ff..255dfb53 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java +++ b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java @@ -9,6 +9,14 @@ public class StorageUtils { private static final String INVALID_STORAGE_URI_ERROR_MSG = "Invalid Object storage Uri: "; + /** + * Group 3 extracts the path portion after the bucket/container name from the URI. + * Examples: + * s3://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt + * gs://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt + * https://account.blob.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt + * https://account.dfs.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt + */ public String getPathFromUrl(String uri) { Matcher matcher = OBJECT_STORAGE_URI_PATTERN.matcher(uri); @@ -16,14 +24,6 @@ public String getPathFromUrl(String uri) { throw new IllegalArgumentException(INVALID_STORAGE_URI_ERROR_MSG + uri); } - /* - * Group 3 extracts the path portion after the bucket/container name from the URI. - * Examples: - * s3://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt - * gs://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt - * https://account.blob.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt - * https://account.dfs.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt - */ String path = matcher.group(3); return path == null ? StringUtils.EMPTY : path.replaceFirst("^/", ""); } @@ -37,6 +37,14 @@ public String constructFileUri(String directoryUri, String filePath) { filePath.startsWith("/") ? filePath.substring(1) : filePath); } + /** + * Group 2 extracts the bucket/container name from the URI. + * Examples: + * s3://my-bucket-s3/path/to/file.txt -> group(2) = my-bucket-s3 + * gs://my-bucket-gs/path/to/file.txt -> group(2) = my-bucket-gs + * https://account.blob.core.windows.net/container/path/file.txt -> group(2) = container + * https://account.dfs.core.windows.net/container/path/file.txt -> group(2) = container + */ public String getBucketNameFromUri(String uri) { Matcher matcher = OBJECT_STORAGE_URI_PATTERN.matcher(uri); if (matcher.matches()) { From df0a5068b2111e6ccd63378f8082a2312d7cb113 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 08:26:04 +0530 Subject: [PATCH 06/17] Remove sonar issue --- .../providers/AzureStorageClientProvider.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java index 3cc5f1d8..64e6779e 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java +++ b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java @@ -12,6 +12,7 @@ import com.azure.storage.common.StorageSharedKeyCredential; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; +import java.util.Optional; import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -38,44 +39,46 @@ protected BlobServiceAsyncClient createAzureAsyncClient() { builder.endpoint(endpoint); // Option 1: Connection String (includes account key and endpoint) - if (azureConfig.getConnectionString().isPresent()) { + Optional connectionStringOpt = azureConfig.getConnectionString(); + if (connectionStringOpt.isPresent()) { logger.debug("Using connection string for authentication"); - builder.connectionString(azureConfig.getConnectionString().get()); + builder.connectionString(connectionStringOpt.get()); return builder.buildAsyncClient(); } // Option 2: Account Key (shared key credential) - if (azureConfig.getAccountKey().isPresent()) { + Optional accountKeyOpt = azureConfig.getAccountKey(); + if (accountKeyOpt.isPresent()) { logger.debug("Using account key for authentication"); StorageSharedKeyCredential credential = - new StorageSharedKeyCredential( - azureConfig.getAccountName(), azureConfig.getAccountKey().get()); + new StorageSharedKeyCredential(azureConfig.getAccountName(), accountKeyOpt.get()); builder.credential(credential); return builder.buildAsyncClient(); } // Option 3: Service Principal (client secret credential) - if (azureConfig.getTenantId().isPresent() - && azureConfig.getClientId().isPresent() - && azureConfig.getClientSecret().isPresent()) { + Optional tenantIdOpt = azureConfig.getTenantId(); + Optional clientIdOpt = azureConfig.getClientId(); + Optional clientSecretOpt = azureConfig.getClientSecret(); + if (tenantIdOpt.isPresent() && clientIdOpt.isPresent() && clientSecretOpt.isPresent()) { logger.debug("Using service principal (client secret) for authentication"); ClientSecretCredential credential = new ClientSecretCredentialBuilder() - .tenantId(azureConfig.getTenantId().get()) - .clientId(azureConfig.getClientId().get()) - .clientSecret(azureConfig.getClientSecret().get()) + .tenantId(tenantIdOpt.get()) + .clientId(clientIdOpt.get()) + .clientSecret(clientSecretOpt.get()) .build(); builder.credential(credential); return builder.buildAsyncClient(); } // Option 4: Managed Identity (tenantId + clientId, no secret) - if (azureConfig.getTenantId().isPresent() && azureConfig.getClientId().isPresent()) { + if (tenantIdOpt.isPresent() && clientIdOpt.isPresent()) { logger.debug("Using managed identity for authentication"); DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() - .tenantId(azureConfig.getTenantId().get()) - .managedIdentityClientId(azureConfig.getClientId().get()) + .tenantId(tenantIdOpt.get()) + .managedIdentityClientId(clientIdOpt.get()) .build(); builder.credential(credential); return builder.buildAsyncClient(); From ef040cb11a9df6a213b68c5124e5c4a050b561b5 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 09:05:48 +0530 Subject: [PATCH 07/17] Add unit tests --- .../storage/AzureAsyncStorageClient.java | 56 +-- .../java/ai/onehouse/TestRuntimeModule.java | 59 ++- .../storage/AzureAsyncStorageClientTest.java | 405 ++++++++++++++++++ .../ai/onehouse/storage/StorageUtilsTest.java | 62 ++- .../AzureStorageClientProviderTest.java | 253 +++++++++++ 5 files changed, 798 insertions(+), 37 deletions(-) create mode 100644 lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java create mode 100644 lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java diff --git a/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java b/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java index 07f93a58..05c54f85 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java +++ b/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java @@ -73,33 +73,34 @@ public CompletableFuture>> fetchObjectsByPage( String nextContinuationToken = null; // Get single page with continuation token - PagedResponse page = + try (PagedResponse page = StringUtils.isNotBlank(continuationToken) ? pagedFlux.byPage(continuationToken).blockFirst() - : pagedFlux.byPage().blockFirst(); - - if (page != null) { - // Process items in the page - page.getElements() - .forEach( - blobItem -> { - String blobName = blobItem.getName(); - boolean isDirectory = blobItem.isPrefix() != null && blobItem.isPrefix(); - String fileName = blobName.replaceFirst("^" + prefix, ""); - - files.add( - File.builder() - .filename(fileName) - .lastModifiedAt( - isDirectory - ? Instant.EPOCH - : blobItem.getProperties().getLastModified().toInstant()) - .isDirectory(isDirectory) - .build()); - }); - - // Get continuation token for next page - nextContinuationToken = page.getContinuationToken(); + : pagedFlux.byPage().blockFirst()) { + + if (page != null) { + // Process items in the page + page.getElements() + .forEach( + blobItem -> { + String blobName = blobItem.getName(); + boolean isDirectory = blobItem.isPrefix() != null && blobItem.isPrefix(); + String fileName = blobName.replaceFirst("^" + prefix, ""); + + files.add( + File.builder() + .filename(fileName) + .lastModifiedAt( + isDirectory + ? Instant.EPOCH + : blobItem.getProperties().getLastModified().toInstant()) + .isDirectory(isDirectory) + .build()); + }); + + // Get continuation token for next page + nextContinuationToken = page.getContinuationToken(); + } } return Pair.of(nextContinuationToken, files); @@ -179,8 +180,9 @@ protected RuntimeException clientException(Throwable ex, String operation, Strin } // Map to NoSuchKeyException - if (errorCode == BlobErrorCode.BLOB_NOT_FOUND - || errorCode == BlobErrorCode.CONTAINER_NOT_FOUND) { + if (errorCode != null + && (errorCode == BlobErrorCode.BLOB_NOT_FOUND + || errorCode == BlobErrorCode.CONTAINER_NOT_FOUND)) { return new NoSuchKeyException( String.format("NoSuchKey for operation: %s on path: %s", operation, path)); } diff --git a/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java b/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java index 5c5430ba..2c1f5b04 100644 --- a/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java +++ b/lakeview/src/test/java/ai/onehouse/TestRuntimeModule.java @@ -10,6 +10,7 @@ import ai.onehouse.api.AsyncHttpClientWithRetry; import ai.onehouse.config.Config; +import ai.onehouse.config.models.common.AzureConfig; import ai.onehouse.config.models.common.FileSystemConfiguration; import ai.onehouse.config.models.common.GCSConfig; import ai.onehouse.config.models.common.S3Config; @@ -119,6 +120,10 @@ void testProvidesAsyncStorageClient(FileSystem fileSystemType) { S3Config mockS3Config = mock(S3Config.class); when(mockFileSystemConfiguration.getS3Config()).thenReturn(mockS3Config); when(mockS3AsyncClientProvider.getS3AsyncClient()).thenReturn(null); + } else if (FileSystem.AZURE.equals(fileSystemType)) { + AzureConfig mockAzureConfig = mock(AzureConfig.class); + when(mockFileSystemConfiguration.getS3Config()).thenReturn(null); + when(mockFileSystemConfiguration.getAzureConfig()).thenReturn(mockAzureConfig); } else { GCSConfig mockGcsConfig = mock(GCSConfig.class); when(mockFileSystemConfiguration.getGcsConfig()).thenReturn(mockGcsConfig); @@ -135,6 +140,8 @@ void testProvidesAsyncStorageClient(FileSystem fileSystemType) { mockExecutorService); if (FileSystem.S3.equals(fileSystemType)) { assertInstanceOf(S3AsyncStorageClient.class, asyncStorageClientForDiscovery); + } else if (FileSystem.AZURE.equals(fileSystemType)) { + assertInstanceOf(AzureAsyncStorageClient.class, asyncStorageClientForDiscovery); } else { assertInstanceOf(GCSAsyncStorageClient.class, asyncStorageClientForDiscovery); } @@ -149,6 +156,8 @@ void testProvidesAsyncStorageClient(FileSystem fileSystemType) { mockExecutorService); if (FileSystem.S3.equals(fileSystemType)) { Assertions.assertInstanceOf(S3AsyncStorageClient.class, asyncStorageClientForUpload); + } else if (FileSystem.AZURE.equals(fileSystemType)) { + Assertions.assertInstanceOf(AzureAsyncStorageClient.class, asyncStorageClientForUpload); } else { Assertions.assertInstanceOf(GCSAsyncStorageClient.class, asyncStorageClientForUpload); } @@ -177,6 +186,7 @@ static class GuiceTestModule extends AbstractModule { private final StorageUtils storageUtils; private final S3AsyncClientProvider s3Provider; private final GcsClientProvider gcsProvider; + private final AzureStorageClientProvider azureProvider; private final ExecutorService executorService; private final Metrics metrics; private final LakeViewExtractorMetrics lakeViewExtractorMetrics; @@ -184,13 +194,15 @@ static class GuiceTestModule extends AbstractModule { private final OnehouseApiClient onehouseApiClient; GuiceTestModule(Config config, StorageUtils storageUtils, S3AsyncClientProvider s3Provider, - GcsClientProvider gcsProvider, ExecutorService executorService, - Metrics metrics, LakeViewExtractorMetrics lakeViewExtractorMetrics, + GcsClientProvider gcsProvider, AzureStorageClientProvider azureProvider, + ExecutorService executorService, Metrics metrics, + LakeViewExtractorMetrics lakeViewExtractorMetrics, AsyncHttpClientWithRetry httpClient, OnehouseApiClient onehouseApiClient) { this.config = config; this.storageUtils = storageUtils; this.s3Provider = s3Provider; this.gcsProvider = gcsProvider; + this.azureProvider = azureProvider; this.executorService = executorService; this.metrics = metrics; this.lakeViewExtractorMetrics = lakeViewExtractorMetrics; @@ -205,6 +217,7 @@ protected void configure() { bind(StorageUtils.class).toInstance(storageUtils); bind(S3AsyncClientProvider.class).toInstance(s3Provider); bind(GcsClientProvider.class).toInstance(gcsProvider); + bind(AzureStorageClientProvider.class).toInstance(azureProvider); bind(ExecutorService.class).toInstance(executorService); bind(Metrics.class).toInstance(metrics); bind(LakeViewExtractorMetrics.class).toInstance(lakeViewExtractorMetrics); @@ -213,7 +226,7 @@ protected void configure() { } @Test - void testGuiceBootstrapping_S3_and_GCS() { + void testGuiceBootstrapping_S3_Azure_and_GCS() { // S3 setup FileSystemConfiguration mockFsConfig = mock(FileSystemConfiguration.class); S3Config mockS3Config = mock(S3Config.class); @@ -227,6 +240,7 @@ void testGuiceBootstrapping_S3_and_GCS() { mock(StorageUtils.class), mock(S3AsyncClientProvider.class), mock(GcsClientProvider.class), + mock(AzureStorageClientProvider.class), mock(ExecutorService.class), mock(Metrics.class), mock(LakeViewExtractorMetrics.class), @@ -241,6 +255,34 @@ void testGuiceBootstrapping_S3_and_GCS() { Key.get(AsyncStorageClient.class, RuntimeModule.TableDiscoveryObjectStorageAsyncClient.class)); Assertions.assertInstanceOf(S3AsyncStorageClient.class, s3ClientDiscovery); + // Azure setup + FileSystemConfiguration mockFsConfigAzure = mock(FileSystemConfiguration.class); + AzureConfig mockAzureConfig = mock(AzureConfig.class); + when(mockConfig.getFileSystemConfiguration()).thenReturn(mockFsConfigAzure); + when(mockFsConfigAzure.getS3Config()).thenReturn(null); + when(mockFsConfigAzure.getAzureConfig()).thenReturn(mockAzureConfig); + + Injector injectorAzure = Guice.createInjector( + Modules.override(new RuntimeModule(mockConfig)) + .with(new GuiceTestModule( + mockConfig, + mock(StorageUtils.class), + mock(S3AsyncClientProvider.class), + mock(GcsClientProvider.class), + mock(AzureStorageClientProvider.class), + mock(ExecutorService.class), + mock(Metrics.class), + mock(LakeViewExtractorMetrics.class), + mock(AsyncHttpClientWithRetry.class), + mock(OnehouseApiClient.class))) + ); + + AsyncStorageClient azureClientUpload = injectorAzure.getInstance( + Key.get(AsyncStorageClient.class, RuntimeModule.TableMetadataUploadObjectStorageAsyncClient.class)); + Assertions.assertInstanceOf(AzureAsyncStorageClient.class, azureClientUpload); + AsyncStorageClient azureClientDiscovery = injectorAzure.getInstance( + Key.get(AsyncStorageClient.class, RuntimeModule.TableDiscoveryObjectStorageAsyncClient.class)); + Assertions.assertInstanceOf(AzureAsyncStorageClient.class, azureClientDiscovery); // GCS setup FileSystemConfiguration mockFsConfigGcs = mock(FileSystemConfiguration.class); @@ -257,8 +299,8 @@ void testGuiceBootstrapping_S3_and_GCS() { Modules.override(new RuntimeModule(mockConfig)) .with(new GuiceTestModule( mockConfig, mock(StorageUtils.class), mock(S3AsyncClientProvider.class), - mock(GcsClientProvider.class), mock(ExecutorService.class), - mockMetrics, mockLakeViewExtractorMetrics, + mock(GcsClientProvider.class), mock(AzureStorageClientProvider.class), + mock(ExecutorService.class), mockMetrics, mockLakeViewExtractorMetrics, mockHttpClient, mockOnehouseApiClient)) ); @@ -279,6 +321,12 @@ mockConfig, mock(StorageUtils.class), mock(S3AsyncClientProvider.class), assertNotNull(injectorS3.getInstance(TableDiscoveryService.class)); assertNotNull(injectorS3.getInstance(TimelineCommitInstantsUploader.class)); assertNotNull(injectorS3.getInstance(PresignedUrlFileUploader.class)); + assertNotNull(injectorAzure.getInstance(AzureStorageClientProvider.class)); + assertNotNull(injectorAzure.getInstance(HoodiePropertiesReader.class)); + assertNotNull(injectorAzure.getInstance(TableDiscoveryAndUploadJob.class)); + assertNotNull(injectorAzure.getInstance(TableDiscoveryService.class)); + assertNotNull(injectorAzure.getInstance(TimelineCommitInstantsUploader.class)); + assertNotNull(injectorAzure.getInstance(PresignedUrlFileUploader.class)); assertNotNull(injectorGcs.getInstance(GcsClientProvider.class)); assertNotNull(injectorGcs.getInstance(HoodiePropertiesReader.class)); assertNotNull(injectorGcs.getInstance(TableDiscoveryAndUploadJob.class)); @@ -289,6 +337,7 @@ mockConfig, mock(StorageUtils.class), mock(S3AsyncClientProvider.class), enum FileSystem { S3, + AZURE, GCS } } diff --git a/lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java b/lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java new file mode 100644 index 00000000..d74f9ede --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java @@ -0,0 +1,405 @@ +package ai.onehouse.storage; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +import ai.onehouse.exceptions.AccessDeniedException; +import ai.onehouse.exceptions.NoSuchKeyException; +import ai.onehouse.exceptions.ObjectStorageClientException; +import ai.onehouse.exceptions.RateLimitException; +import ai.onehouse.storage.models.File; +import ai.onehouse.storage.models.FileStreamData; +import ai.onehouse.storage.providers.AzureStorageClientProvider; +import com.azure.core.http.rest.PagedFlux; +import com.azure.core.http.rest.PagedResponse; +import com.azure.core.util.BinaryData; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobContainerAsyncClient; +import com.azure.storage.blob.BlobServiceAsyncClient; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.core.util.IterableStream; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobItemProperties; +import com.azure.storage.blob.models.BlobStorageException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@ExtendWith(MockitoExtension.class) +class AzureAsyncStorageClientTest { + + @Mock private AzureStorageClientProvider mockAzureStorageClientProvider; + @Mock private StorageUtils mockStorageUtils; + @Mock private BlobServiceAsyncClient mockBlobServiceAsyncClient; + @Mock private BlobContainerAsyncClient mockContainerAsyncClient; + @Mock private BlobAsyncClient mockBlobAsyncClient; + @Mock private PagedFlux mockPagedFlux; + @Mock private PagedResponse mockPagedResponse1; + @Mock private PagedResponse mockPagedResponse2; + @Mock private BlobItem mockBlobItem1; + @Mock private BlobItem mockBlobItem2; + @Mock private BlobItemProperties mockBlobItemProperties; + + private AzureAsyncStorageClient azureAsyncStorageClient; + private static final String AZURE_URI = + "https://testaccount.blob.core.windows.net/test-container/test-blob"; + private static final String TEST_CONTAINER = "test-container"; + private static final String TEST_BLOB = "test-blob"; + + @BeforeEach + void setup() { + lenient() + .when(mockAzureStorageClientProvider.getAzureAsyncClient()) + .thenReturn(mockBlobServiceAsyncClient); + lenient().when(mockStorageUtils.getBucketNameFromUri(AZURE_URI)).thenReturn(TEST_CONTAINER); + lenient().when(mockStorageUtils.getPathFromUrl(AZURE_URI)).thenReturn(TEST_BLOB); + azureAsyncStorageClient = + new AzureAsyncStorageClient( + mockAzureStorageClientProvider, mockStorageUtils, ForkJoinPool.commonPool()); + } + + @Test + void testListAllFilesInDir() throws ExecutionException, InterruptedException { + String fileName = "file1"; + String dirName = "dir1/"; + String continuationToken = "page_2"; + String prefix = TEST_BLOB + "/"; + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + .thenReturn(mockPagedFlux); + when(mockPagedFlux.byPage()).thenReturn(Flux.just(mockPagedResponse1)); + when(mockPagedFlux.byPage(continuationToken)).thenReturn(Flux.just(mockPagedResponse2)); + + // First page + when(mockPagedResponse1.getElements()) + .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + when(mockPagedResponse1.getContinuationToken()).thenReturn(continuationToken); + when(mockBlobItem1.getName()).thenReturn(prefix + fileName); + when(mockBlobItem1.isPrefix()).thenReturn(null); + when(mockBlobItem1.getProperties()).thenReturn(mockBlobItemProperties); + when(mockBlobItemProperties.getLastModified()).thenReturn(OffsetDateTime.now()); + + // Second page + when(mockPagedResponse2.getElements()) + .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem2))); + when(mockPagedResponse2.getContinuationToken()).thenReturn(null); + when(mockBlobItem2.getName()).thenReturn(prefix + dirName); + when(mockBlobItem2.isPrefix()).thenReturn(true); + + List result = azureAsyncStorageClient.listAllFilesInDir(AZURE_URI).get(); + + assertEquals(2, result.size()); + assertFalse(result.get(0).isDirectory()); + assertEquals(fileName, result.get(0).getFilename()); + assertTrue(result.get(1).isDirectory()); + assertEquals(dirName, result.get(1).getFilename()); + } + + @Test + void testFetchObjectsByPage() throws ExecutionException, InterruptedException { + String fileName = "file1"; + String prefix = "prefix"; + String continuationToken = "token"; + String nextToken = "next-token"; + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + .thenReturn(mockPagedFlux); + when(mockPagedFlux.byPage(continuationToken)) + .thenReturn(Flux.just(mockPagedResponse1)); + + when(mockPagedResponse1.getElements()) + .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + when(mockPagedResponse1.getContinuationToken()).thenReturn(nextToken); + when(mockBlobItem1.getName()).thenReturn(prefix + "/" + fileName); + when(mockBlobItem1.isPrefix()).thenReturn(null); + when(mockBlobItem1.getProperties()).thenReturn(mockBlobItemProperties); + when(mockBlobItemProperties.getLastModified()).thenReturn(OffsetDateTime.now()); + + Pair> result = + azureAsyncStorageClient + .fetchObjectsByPage(TEST_CONTAINER, prefix, continuationToken, null) + .get(); + + assertEquals(nextToken, result.getLeft()); + assertEquals(1, result.getRight().size()); + assertEquals("/" + fileName, result.getRight().get(0).getFilename()); + } + + @Test + void testFetchObjectsByPageWithoutContinuationToken() + throws ExecutionException, InterruptedException { + String fileName = "file1"; + String prefix = "prefix"; + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + .thenReturn(mockPagedFlux); + when(mockPagedFlux.byPage()).thenReturn(Flux.just(mockPagedResponse1)); + + when(mockPagedResponse1.getElements()) + .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + when(mockPagedResponse1.getContinuationToken()).thenReturn(null); + when(mockBlobItem1.getName()).thenReturn(prefix + "/" + fileName); + when(mockBlobItem1.isPrefix()).thenReturn(null); + when(mockBlobItem1.getProperties()).thenReturn(mockBlobItemProperties); + when(mockBlobItemProperties.getLastModified()).thenReturn(OffsetDateTime.now()); + + Pair> result = + azureAsyncStorageClient.fetchObjectsByPage(TEST_CONTAINER, prefix, null, null).get(); + + assertNull(result.getLeft()); + assertEquals(1, result.getRight().size()); + } + + @Test + void testReadBlob() throws ExecutionException, InterruptedException { + byte[] fileContent = "test content".getBytes(StandardCharsets.UTF_8); + BinaryData binaryData = BinaryData.fromBytes(fileContent); + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); + when(mockBlobAsyncClient.downloadContent()).thenReturn(Mono.just(binaryData)); + + BinaryData result = azureAsyncStorageClient.readBlob(AZURE_URI).get(); + + assertNotNull(result); + assertArrayEquals(fileContent, result.toBytes()); + } + + @Test + void testStreamFileAsync() throws ExecutionException, InterruptedException, IOException { + byte[] fileContent = "test content".getBytes(StandardCharsets.UTF_8); + BinaryData binaryData = BinaryData.fromBytes(fileContent); + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); + when(mockBlobAsyncClient.downloadContent()).thenReturn(Mono.just(binaryData)); + + FileStreamData result = azureAsyncStorageClient.streamFileAsync(AZURE_URI).get(); + + assertNotNull(result); + assertEquals(fileContent.length, result.getFileSize()); + + byte[] resultContent = toByteArray(result.getInputStream()); + assertArrayEquals(fileContent, resultContent); + } + + @Test + void testReadFileAsBytes() throws ExecutionException, InterruptedException { + byte[] fileContent = "test content".getBytes(StandardCharsets.UTF_8); + BinaryData binaryData = BinaryData.fromBytes(fileContent); + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); + when(mockBlobAsyncClient.downloadContent()).thenReturn(Mono.just(binaryData)); + + byte[] result = azureAsyncStorageClient.readFileAsBytes(AZURE_URI).get(); + + assertArrayEquals(fileContent, result); + } + + @ParameterizedTest + @MethodSource("generateBlobStorageExceptionTestCases") + void testReadBlobWithBlobStorageException( + BlobStorageException exception, Class expectedExceptionClass) { + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); + when(mockBlobAsyncClient.downloadContent()).thenAnswer(invocation -> Mono.error(exception)); + + CompletableFuture future = azureAsyncStorageClient.readBlob(AZURE_URI); + CompletionException executionException = assertThrows(CompletionException.class, future::join); + + Throwable cause = executionException.getCause(); + assertInstanceOf(expectedExceptionClass, cause); + } + + static Stream generateBlobStorageExceptionTestCases() { + BlobStorageException accessDenied403 = mock(BlobStorageException.class); + when(accessDenied403.getStatusCode()).thenReturn(403); + when(accessDenied403.getMessage()).thenReturn("Access denied"); + when(accessDenied403.getErrorCode()).thenReturn(null); + + BlobStorageException unauthorized401 = mock(BlobStorageException.class); + when(unauthorized401.getStatusCode()).thenReturn(401); + when(unauthorized401.getMessage()).thenReturn("Unauthorized"); + when(unauthorized401.getErrorCode()).thenReturn(null); + + BlobStorageException blobNotFound = mock(BlobStorageException.class); + when(blobNotFound.getStatusCode()).thenReturn(404); + when(blobNotFound.getErrorCode()).thenReturn(BlobErrorCode.BLOB_NOT_FOUND); + when(blobNotFound.getMessage()).thenReturn("Blob not found"); + + BlobStorageException containerNotFound = mock(BlobStorageException.class); + when(containerNotFound.getStatusCode()).thenReturn(404); + when(containerNotFound.getErrorCode()).thenReturn(BlobErrorCode.CONTAINER_NOT_FOUND); + when(containerNotFound.getMessage()).thenReturn("Container not found"); + + BlobStorageException tooManyRequests = mock(BlobStorageException.class); + when(tooManyRequests.getStatusCode()).thenReturn(429); + when(tooManyRequests.getMessage()).thenReturn("Too many requests"); + when(tooManyRequests.getErrorCode()).thenReturn(null); + + BlobStorageException serviceUnavailable = mock(BlobStorageException.class); + when(serviceUnavailable.getStatusCode()).thenReturn(503); + when(serviceUnavailable.getMessage()).thenReturn("Service unavailable"); + when(serviceUnavailable.getErrorCode()).thenReturn(null); + + BlobStorageException internalError = mock(BlobStorageException.class); + when(internalError.getStatusCode()).thenReturn(500); + when(internalError.getMessage()).thenReturn("Internal error"); + when(internalError.getErrorCode()).thenReturn(null); + + return Stream.of( + Arguments.of(accessDenied403, AccessDeniedException.class), + Arguments.of(unauthorized401, AccessDeniedException.class), + Arguments.of(blobNotFound, NoSuchKeyException.class), + Arguments.of(containerNotFound, NoSuchKeyException.class), + Arguments.of(tooManyRequests, RateLimitException.class), + Arguments.of(serviceUnavailable, RateLimitException.class), + Arguments.of(internalError, ObjectStorageClientException.class)); + } + + @ParameterizedTest + @MethodSource("generateWrappedExceptionTestCases") + void testReadBlobWithWrappedException( + RuntimeException exception, Class expectedExceptionClass) { + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); + when(mockBlobAsyncClient.downloadContent()).thenAnswer(invocation -> Mono.error(exception)); + + CompletableFuture future = azureAsyncStorageClient.readBlob(AZURE_URI); + CompletionException executionException = assertThrows(CompletionException.class, future::join); + + Throwable cause = executionException.getCause(); + assertInstanceOf(expectedExceptionClass, cause); + } + + static Stream generateWrappedExceptionTestCases() { + return Stream.of( + Arguments.of(new AccessDeniedException("error"), AccessDeniedException.class), + Arguments.of(new NoSuchKeyException("error"), NoSuchKeyException.class), + Arguments.of(new RateLimitException("error"), RateLimitException.class)); + } + + @Test + void testFetchObjectsByPageWithException() { + BlobStorageException exception = mock(BlobStorageException.class); + when(exception.getStatusCode()).thenReturn(429); + when(exception.getMessage()).thenReturn("Throttled"); + when(exception.getErrorCode()).thenReturn(null); + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + .thenThrow(exception); + + CompletableFuture>> future = + azureAsyncStorageClient.fetchObjectsByPage(TEST_CONTAINER, "prefix", null, null); + CompletionException executionException = assertThrows(CompletionException.class, future::join); + + Throwable cause = executionException.getCause(); + assertInstanceOf(RateLimitException.class, cause); + assertTrue( + cause.getMessage().contains("Throttled by Azure for operation: fetchObjectsByPage")); + } + + @Test + void testRefreshClient() { + azureAsyncStorageClient.refreshClient(); + verify(mockAzureStorageClientProvider, times(1)).refreshClient(); + } + + @Test + void testInitializeClient() { + azureAsyncStorageClient.initializeClient(); + verify(mockAzureStorageClientProvider, times(1)).getAzureAsyncClient(); + } + + @Test + void testReadBlobWithGenericException() { + RuntimeException exception = new RuntimeException("Generic error"); + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); + when(mockBlobAsyncClient.downloadContent()).thenAnswer(invocation -> Mono.error(exception)); + + CompletableFuture future = azureAsyncStorageClient.readBlob(AZURE_URI); + CompletionException executionException = assertThrows(CompletionException.class, future::join); + + Throwable cause = executionException.getCause(); + assertInstanceOf(ObjectStorageClientException.class, cause); + } + + @Test + void testFetchObjectsByPageWithDirectoryItems() throws ExecutionException, InterruptedException { + String dirName = "dir1/"; + String prefix = "prefix"; + + when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) + .thenReturn(mockContainerAsyncClient); + when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + .thenReturn(mockPagedFlux); + when(mockPagedFlux.byPage()).thenReturn(Flux.just(mockPagedResponse1)); + + when(mockPagedResponse1.getElements()) + .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + when(mockPagedResponse1.getContinuationToken()).thenReturn(null); + when(mockBlobItem1.getName()).thenReturn(prefix + dirName); + when(mockBlobItem1.isPrefix()).thenReturn(true); + + Pair> result = + azureAsyncStorageClient.fetchObjectsByPage(TEST_CONTAINER, prefix, null, null).get(); + + assertEquals(1, result.getRight().size()); + assertTrue(result.getRight().get(0).isDirectory()); + assertEquals(dirName, result.getRight().get(0).getFilename()); + assertEquals(Instant.EPOCH, result.getRight().get(0).getLastModifiedAt()); + } + + private static byte[] toByteArray(InputStream is) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + baos.write(buffer, 0, bytesRead); + } + return baos.toByteArray(); + } + } + +} diff --git a/lakeview/src/test/java/ai/onehouse/storage/StorageUtilsTest.java b/lakeview/src/test/java/ai/onehouse/storage/StorageUtilsTest.java index 4fb326c3..bcfad568 100644 --- a/lakeview/src/test/java/ai/onehouse/storage/StorageUtilsTest.java +++ b/lakeview/src/test/java/ai/onehouse/storage/StorageUtilsTest.java @@ -11,8 +11,20 @@ class StorageUtilsTest { void testGetPathFromUrl() { assertEquals("path/to/file", storageUtils.getPathFromUrl("s3://bucket/path/to/file")); assertEquals("path/to/file", storageUtils.getPathFromUrl("gs://bucket/path/to/file")); + assertEquals( + "path/to/file", + storageUtils.getPathFromUrl( + "https://account.blob.core.windows.net/container/path/to/file")); + assertEquals( + "path/to/file", + storageUtils.getPathFromUrl( + "https://account.dfs.core.windows.net/container/path/to/file")); assertEquals("", storageUtils.getPathFromUrl("s3://bucket")); assertEquals("", storageUtils.getPathFromUrl("gs://bucket")); + assertEquals( + "", storageUtils.getPathFromUrl("https://account.blob.core.windows.net/container")); + assertEquals( + "", storageUtils.getPathFromUrl("https://account.dfs.core.windows.net/container")); assertThrows(IllegalArgumentException.class, () -> storageUtils.getPathFromUrl("invalidUri")); } @@ -20,21 +32,45 @@ void testGetPathFromUrl() { void testConstructFileUri() { String s3DirUriWithoutTrailingSlash = "s3://bucket/dir1"; String s3DirUriWithTrailingSlash = "s3://bucket/dir1/"; + String azureDirUriWithoutTrailingSlash = + "https://account.blob.core.windows.net/container/dir1"; + String azureDirUriWithTrailingSlash = + "https://account.blob.core.windows.net/container/dir1/"; String filePathWithoutPrefixSlash = "file.txt"; String filePathWithPrefixSlash = "/file.txt"; - String expectedFileUri = s3DirUriWithTrailingSlash + filePathWithoutPrefixSlash; + String expectedS3FileUri = s3DirUriWithTrailingSlash + filePathWithoutPrefixSlash; + String expectedAzureFileUri = azureDirUriWithTrailingSlash + filePathWithoutPrefixSlash; + + // S3 tests assertEquals( - expectedFileUri, + expectedS3FileUri, storageUtils.constructFileUri(s3DirUriWithoutTrailingSlash, filePathWithoutPrefixSlash)); assertEquals( - expectedFileUri, + expectedS3FileUri, storageUtils.constructFileUri(s3DirUriWithTrailingSlash, filePathWithoutPrefixSlash)); assertEquals( - expectedFileUri, + expectedS3FileUri, storageUtils.constructFileUri(s3DirUriWithoutTrailingSlash, filePathWithPrefixSlash)); assertEquals( - expectedFileUri, + expectedS3FileUri, storageUtils.constructFileUri(s3DirUriWithTrailingSlash, filePathWithPrefixSlash)); + + // Azure tests + assertEquals( + expectedAzureFileUri, + storageUtils.constructFileUri( + azureDirUriWithoutTrailingSlash, filePathWithoutPrefixSlash)); + assertEquals( + expectedAzureFileUri, + storageUtils.constructFileUri(azureDirUriWithTrailingSlash, filePathWithoutPrefixSlash)); + assertEquals( + expectedAzureFileUri, + storageUtils.constructFileUri(azureDirUriWithoutTrailingSlash, filePathWithPrefixSlash)); + assertEquals( + expectedAzureFileUri, + storageUtils.constructFileUri(azureDirUriWithTrailingSlash, filePathWithPrefixSlash)); + + // Edge cases assertEquals( filePathWithPrefixSlash, storageUtils.constructFileUri("", filePathWithoutPrefixSlash)); assertEquals( @@ -49,6 +85,22 @@ void testConstructFileUri() { void testGetBucketNameFromUri() { assertEquals("bucket", storageUtils.getBucketNameFromUri("s3://bucket/path/to/file")); assertEquals("bucket", storageUtils.getBucketNameFromUri("gs://bucket/path/to/file")); + assertEquals( + "container", + storageUtils.getBucketNameFromUri( + "https://account.blob.core.windows.net/container/path/to/file")); + assertEquals( + "container", + storageUtils.getBucketNameFromUri( + "https://account.dfs.core.windows.net/container/path/to/file")); + assertEquals("bucket", storageUtils.getBucketNameFromUri("s3://bucket")); + assertEquals("bucket", storageUtils.getBucketNameFromUri("gs://bucket")); + assertEquals( + "container", + storageUtils.getBucketNameFromUri("https://account.blob.core.windows.net/container")); + assertEquals( + "container", + storageUtils.getBucketNameFromUri("https://account.dfs.core.windows.net/container")); assertThrows( IllegalArgumentException.class, () -> storageUtils.getBucketNameFromUri("invalidUri")); } diff --git a/lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java b/lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java new file mode 100644 index 00000000..2deaa006 --- /dev/null +++ b/lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java @@ -0,0 +1,253 @@ +package ai.onehouse.storage.providers; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import ai.onehouse.config.models.common.AzureConfig; +import ai.onehouse.config.models.common.FileSystemConfiguration; +import ai.onehouse.config.models.configv1.ConfigV1; +import com.azure.identity.ClientSecretCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.DefaultAzureCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobServiceAsyncClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class AzureStorageClientProviderTest { + @Mock private ConfigV1 config; + @Mock private FileSystemConfiguration fileSystemConfiguration; + @Mock private AzureConfig azureConfig; + @Mock private BlobServiceAsyncClient mockBlobServiceAsyncClient; + + @BeforeEach + void setup() { + when(config.getFileSystemConfiguration()).thenReturn(fileSystemConfiguration); + when(fileSystemConfiguration.getAzureConfig()).thenReturn(azureConfig); + } + + @Test + void throwExceptionWhenAzureConfigIsNull() { + when(fileSystemConfiguration.getAzureConfig()).thenReturn(null); + AzureStorageClientProvider clientProvider = new AzureStorageClientProvider(config); + + IllegalArgumentException thrown = + assertThrows(IllegalArgumentException.class, clientProvider::createAzureAsyncClient); + + assertEquals("Azure Config not found", thrown.getMessage()); + } + + @Test + void throwExceptionWhenAccountNameIsBlank() { + when(azureConfig.getAccountName()).thenReturn(""); + + AzureStorageClientProvider clientProvider = new AzureStorageClientProvider(config); + IllegalArgumentException thrown = + assertThrows(IllegalArgumentException.class, clientProvider::createAzureAsyncClient); + + assertEquals("Azure storage account name cannot be empty", thrown.getMessage()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testInstantiateAzureClientWithConnectionString(boolean isRefreshClient) { + when(azureConfig.getAccountName()).thenReturn("testaccount"); + when(azureConfig.getConnectionString()) + .thenReturn( + Optional.of( + "DefaultEndpointsProtocol=https;AccountName=testaccount;AccountKey=key;EndpointSuffix=core.windows.net")); + + try (MockedConstruction mockedBuilder = + mockConstruction( + BlobServiceClientBuilder.class, + (mock, context) -> { + when(mock.endpoint(anyString())).thenReturn(mock); + when(mock.connectionString(anyString())).thenReturn(mock); + when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + })) { + + AzureStorageClientProvider azureClientProviderSpy = + Mockito.spy(new AzureStorageClientProvider(config)); + AzureStorageClientProvider.resetAzureAsyncClient(); + + if (!isRefreshClient) { + BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + assertNotNull(result); + } else { + azureClientProviderSpy.refreshClient(); + } + + verify(azureClientProviderSpy, times(1)).createAzureAsyncClient(); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testInstantiateAzureClientWithAccountKey(boolean isRefreshClient) { + when(azureConfig.getAccountName()).thenReturn("testaccount"); + when(azureConfig.getConnectionString()).thenReturn(Optional.empty()); + when(azureConfig.getAccountKey()).thenReturn(Optional.of("dGVzdGFjY291bnRrZXk=")); + + try (MockedConstruction mockedBuilder = + mockConstruction( + BlobServiceClientBuilder.class, + (mock, context) -> { + when(mock.endpoint(anyString())).thenReturn(mock); + when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock); + when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + }); + MockedConstruction mockedCredential = + mockConstruction(StorageSharedKeyCredential.class)) { + + AzureStorageClientProvider azureClientProviderSpy = + Mockito.spy(new AzureStorageClientProvider(config)); + AzureStorageClientProvider.resetAzureAsyncClient(); + + if (!isRefreshClient) { + BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + assertNotNull(result); + } else { + azureClientProviderSpy.refreshClient(); + } + + verify(azureClientProviderSpy, times(1)).createAzureAsyncClient(); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testInstantiateAzureClientWithServicePrincipal(boolean isRefreshClient) { + when(azureConfig.getAccountName()).thenReturn("testaccount"); + when(azureConfig.getConnectionString()).thenReturn(Optional.empty()); + when(azureConfig.getAccountKey()).thenReturn(Optional.empty()); + when(azureConfig.getTenantId()).thenReturn(Optional.of("test-tenant-id")); + when(azureConfig.getClientId()).thenReturn(Optional.of("test-client-id")); + when(azureConfig.getClientSecret()).thenReturn(Optional.of("test-client-secret")); + + try (MockedConstruction mockedBuilder = + mockConstruction( + BlobServiceClientBuilder.class, + (mock, context) -> { + when(mock.endpoint(anyString())).thenReturn(mock); + when(mock.credential(any(ClientSecretCredential.class))).thenReturn(mock); + when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + }); + MockedConstruction mockedCredBuilder = + mockConstruction( + ClientSecretCredentialBuilder.class, + (mock, context) -> { + when(mock.tenantId(anyString())).thenReturn(mock); + when(mock.clientId(anyString())).thenReturn(mock); + when(mock.clientSecret(anyString())).thenReturn(mock); + when(mock.build()).thenReturn(Mockito.mock(ClientSecretCredential.class)); + })) { + + AzureStorageClientProvider azureClientProviderSpy = + Mockito.spy(new AzureStorageClientProvider(config)); + AzureStorageClientProvider.resetAzureAsyncClient(); + + if (!isRefreshClient) { + BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + assertNotNull(result); + } else { + azureClientProviderSpy.refreshClient(); + } + + verify(azureClientProviderSpy, times(1)).createAzureAsyncClient(); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testInstantiateAzureClientWithManagedIdentity(boolean isRefreshClient) { + when(azureConfig.getAccountName()).thenReturn("testaccount"); + when(azureConfig.getConnectionString()).thenReturn(Optional.empty()); + when(azureConfig.getAccountKey()).thenReturn(Optional.empty()); + when(azureConfig.getTenantId()).thenReturn(Optional.of("test-tenant-id")); + when(azureConfig.getClientId()).thenReturn(Optional.of("test-client-id")); + when(azureConfig.getClientSecret()).thenReturn(Optional.empty()); + + try (MockedConstruction mockedBuilder = + mockConstruction( + BlobServiceClientBuilder.class, + (mock, context) -> { + when(mock.endpoint(anyString())).thenReturn(mock); + when(mock.credential(any(DefaultAzureCredential.class))).thenReturn(mock); + when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + }); + MockedConstruction mockedCredBuilder = + mockConstruction( + DefaultAzureCredentialBuilder.class, + (mock, context) -> { + when(mock.tenantId(anyString())).thenReturn(mock); + when(mock.managedIdentityClientId(anyString())).thenReturn(mock); + when(mock.build()).thenReturn(Mockito.mock(DefaultAzureCredential.class)); + })) { + + AzureStorageClientProvider azureClientProviderSpy = + Mockito.spy(new AzureStorageClientProvider(config)); + AzureStorageClientProvider.resetAzureAsyncClient(); + + if (!isRefreshClient) { + BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + assertNotNull(result); + } else { + azureClientProviderSpy.refreshClient(); + } + + verify(azureClientProviderSpy, times(1)).createAzureAsyncClient(); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testInstantiateAzureClientWithDefaultCredential(boolean isRefreshClient) { + when(azureConfig.getAccountName()).thenReturn("testaccount"); + when(azureConfig.getConnectionString()).thenReturn(Optional.empty()); + when(azureConfig.getAccountKey()).thenReturn(Optional.empty()); + when(azureConfig.getTenantId()).thenReturn(Optional.empty()); + when(azureConfig.getClientId()).thenReturn(Optional.empty()); + + try (MockedConstruction mockedBuilder = + mockConstruction( + BlobServiceClientBuilder.class, + (mock, context) -> { + when(mock.endpoint(anyString())).thenReturn(mock); + when(mock.credential(any(DefaultAzureCredential.class))).thenReturn(mock); + when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + }); + MockedConstruction mockedCredBuilder = + mockConstruction( + DefaultAzureCredentialBuilder.class, + (mock, context) -> { + when(mock.build()).thenReturn(Mockito.mock(DefaultAzureCredential.class)); + })) { + + AzureStorageClientProvider azureClientProviderSpy = + Mockito.spy(new AzureStorageClientProvider(config)); + AzureStorageClientProvider.resetAzureAsyncClient(); + + if (!isRefreshClient) { + BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + assertNotNull(result); + } else { + azureClientProviderSpy.refreshClient(); + } + + verify(azureClientProviderSpy, times(1)).createAzureAsyncClient(); + } + } +} From 8dd1f3f7a831d2a57ee9148ab066d3aa7f1555ba Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 09:22:14 +0530 Subject: [PATCH 08/17] Fix comment --- .../ai/onehouse/storage/StorageUtils.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java index 255dfb53..d6e488bc 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java +++ b/lakeview/src/main/java/ai/onehouse/storage/StorageUtils.java @@ -12,10 +12,14 @@ public class StorageUtils { /** * Group 3 extracts the path portion after the bucket/container name from the URI. * Examples: - * s3://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt - * gs://my-bucket/path/to/file.txt -> group(3) = /path/to/file.txt - * https://account.blob.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt - * https://account.dfs.core.windows.net/container/path/to/file.txt -> group(3) = /path/to/file.txt + *
    + *
  • s3://my-bucket/path/to/file.txt returns /path/to/file.txt
  • + *
  • gs://my-bucket/path/to/file.txt returns /path/to/file.txt
  • + *
  • https://account.blob.core.windows.net/container/path/to/file.txt returns /path/to/file.txt
  • + *
  • https://account.dfs.core.windows.net/container/path/to/file.txt returns /path/to/file.txt
  • + *
+ * @param uri the storage URI to parse + * @return the path portion of the URI, or empty string if no path */ public String getPathFromUrl(String uri) { @@ -40,10 +44,14 @@ public String constructFileUri(String directoryUri, String filePath) { /** * Group 2 extracts the bucket/container name from the URI. * Examples: - * s3://my-bucket-s3/path/to/file.txt -> group(2) = my-bucket-s3 - * gs://my-bucket-gs/path/to/file.txt -> group(2) = my-bucket-gs - * https://account.blob.core.windows.net/container/path/file.txt -> group(2) = container - * https://account.dfs.core.windows.net/container/path/file.txt -> group(2) = container + *
    + *
  • s3://my-bucket-s3/path/to/file.txt returns my-bucket-s3
  • + *
  • gs://my-bucket-gs/path/to/file.txt returns my-bucket-gs
  • + *
  • https://account.blob.core.windows.net/container/path/file.txt returns container
  • + *
  • https://account.dfs.core.windows.net/container/path/file.txt returns container
  • + *
+ * @param uri the storage URI to parse + * @return the bucket or container name */ public String getBucketNameFromUri(String uri) { Matcher matcher = OBJECT_STORAGE_URI_PATTERN.matcher(uri); From d938d2ea5f7557930488c48dc3fcd194387a6e19 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 12:39:14 +0530 Subject: [PATCH 09/17] Fix configs --- .../onehouse/config/models/common/AzureConfig.java | 5 +---- .../providers/AzureStorageClientProvider.java | 14 +------------- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java b/lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java index 4ba1e31d..4a5e4114 100644 --- a/lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java +++ b/lakeview/src/main/java/ai/onehouse/config/models/common/AzureConfig.java @@ -15,17 +15,14 @@ public class AzureConfig { @NonNull private String accountName; // Optional authentication methods - // Option 1: Account Key (for dev/testing, never expires) @Builder.Default private Optional accountKey = Optional.empty(); // Option 2: Connection String (alternative to account key) @Builder.Default private Optional connectionString = Optional.empty(); - // Option 3: Managed Identity or Service Principal + // Option 3: Service Principal @Builder.Default private Optional tenantId = Optional.empty(); @Builder.Default private Optional clientId = Optional.empty(); - - // For Service Principal authentication (Pull Model support) @Builder.Default private Optional clientSecret = Optional.empty(); } diff --git a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java index 64e6779e..5a25d238 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java +++ b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java @@ -72,19 +72,7 @@ protected BlobServiceAsyncClient createAzureAsyncClient() { return builder.buildAsyncClient(); } - // Option 4: Managed Identity (tenantId + clientId, no secret) - if (tenantIdOpt.isPresent() && clientIdOpt.isPresent()) { - logger.debug("Using managed identity for authentication"); - DefaultAzureCredential credential = - new DefaultAzureCredentialBuilder() - .tenantId(tenantIdOpt.get()) - .managedIdentityClientId(clientIdOpt.get()) - .build(); - builder.credential(credential); - return builder.buildAsyncClient(); - } - - // Option 5: Default Azure Credential (fallback to environment-based auth) + // Option 4: Default Azure Credential (fallback to environment-based auth) logger.debug("Using default Azure credential chain for authentication"); DefaultAzureCredential credential = new DefaultAzureCredentialBuilder().build(); builder.credential(credential); From e716522332d8165093c10e6954443dee8acf548e Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 13:09:19 +0530 Subject: [PATCH 10/17] Use dfs in endpoint instead of blob --- .../onehouse/storage/providers/AzureStorageClientProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java index 5a25d238..6c3d4f9b 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java +++ b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java @@ -35,7 +35,7 @@ protected BlobServiceAsyncClient createAzureAsyncClient() { validateAzureConfig(azureConfig); BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); - String endpoint = String.format("https://%s.blob.core.windows.net", azureConfig.getAccountName()); + String endpoint = String.format("https://%s.dfs.core.windows.net", azureConfig.getAccountName()); builder.endpoint(endpoint); // Option 1: Connection String (includes account key and endpoint) From 2bedb796caf3fb48164026a96d16eac98124ee89 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 14:05:22 +0530 Subject: [PATCH 11/17] Fix liberary versions --- lakeview/build.gradle | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lakeview/build.gradle b/lakeview/build.gradle index 06d6633c..ab9453bb 100644 --- a/lakeview/build.gradle +++ b/lakeview/build.gradle @@ -61,9 +61,8 @@ dependencies { exclude group: "com.google.protobuf", module: "protobuf-java-utils" } - implementation platform('com.azure:azure-sdk-bom:1.2.25') - implementation 'com.azure:azure-storage-blob' - implementation 'com.azure:azure-identity' + implementation 'com.azure:azure-storage-blob:12.20.2' + implementation 'com.azure:azure-identity:1.7.3' implementation 'org.springframework:spring-beans:5.3.23' From 949ae890ede478c96d7234d8137cf6742c122013 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 14:29:45 +0530 Subject: [PATCH 12/17] Add debugging logs in azure client --- lakeview/src/main/resources/logback.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lakeview/src/main/resources/logback.xml b/lakeview/src/main/resources/logback.xml index ef1095dc..dc0e03d9 100644 --- a/lakeview/src/main/resources/logback.xml +++ b/lakeview/src/main/resources/logback.xml @@ -13,6 +13,8 @@ + + From 560148320fd4dbefbcd2fd651ad28ab69afec6af Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 14:40:47 +0530 Subject: [PATCH 13/17] Update dependency to latest stable --- lakeview/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lakeview/build.gradle b/lakeview/build.gradle index ab9453bb..2ca34109 100644 --- a/lakeview/build.gradle +++ b/lakeview/build.gradle @@ -61,8 +61,8 @@ dependencies { exclude group: "com.google.protobuf", module: "protobuf-java-utils" } - implementation 'com.azure:azure-storage-blob:12.20.2' - implementation 'com.azure:azure-identity:1.7.3' + implementation 'com.azure:azure-storage-blob:12.30.0' + implementation 'com.azure:azure-identity:1.18.1' implementation 'org.springframework:spring-beans:5.3.23' From 948d83470632fa5aff7a8b0de4a48758f8dbf809 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 14:49:28 +0530 Subject: [PATCH 14/17] Fix lib --- lakeview/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lakeview/build.gradle b/lakeview/build.gradle index 2ca34109..03bd5398 100644 --- a/lakeview/build.gradle +++ b/lakeview/build.gradle @@ -61,8 +61,8 @@ dependencies { exclude group: "com.google.protobuf", module: "protobuf-java-utils" } - implementation 'com.azure:azure-storage-blob:12.30.0' - implementation 'com.azure:azure-identity:1.18.1' + implementation 'com.azure:azure-storage-blob:12.23.0' + implementation 'com.azure:azure-identity:1.10.0' implementation 'org.springframework:spring-beans:5.3.23' From c4819cbfb2327a946f06af4956f9cd213e79561f Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 15:58:03 +0530 Subject: [PATCH 15/17] Remove debug loggong --- lakeview/src/main/resources/logback.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/lakeview/src/main/resources/logback.xml b/lakeview/src/main/resources/logback.xml index dc0e03d9..ef1095dc 100644 --- a/lakeview/src/main/resources/logback.xml +++ b/lakeview/src/main/resources/logback.xml @@ -13,8 +13,6 @@ - - From 4bf29cef5dceff08b165f55f044b986008dbb33b Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 18:09:57 +0530 Subject: [PATCH 16/17] Fix uri to blob --- .../onehouse/storage/providers/AzureStorageClientProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java index 6c3d4f9b..5a25d238 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java +++ b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java @@ -35,7 +35,7 @@ protected BlobServiceAsyncClient createAzureAsyncClient() { validateAzureConfig(azureConfig); BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); - String endpoint = String.format("https://%s.dfs.core.windows.net", azureConfig.getAccountName()); + String endpoint = String.format("https://%s.blob.core.windows.net", azureConfig.getAccountName()); builder.endpoint(endpoint); // Option 1: Connection String (includes account key and endpoint) From 6242061698c69b1aa692e614df4b451f3ce08844 Mon Sep 17 00:00:00 2001 From: Dharmender Sheshma Date: Fri, 23 Jan 2026 21:23:48 +0530 Subject: [PATCH 17/17] Use dfs client --- lakeview/build.gradle | 4 +- .../storage/AzureAsyncStorageClient.java | 102 +++++---- .../providers/AzureStorageClientProvider.java | 16 +- .../storage/AzureAsyncStorageClientTest.java | 203 +++++++++--------- .../AzureStorageClientProviderTest.java | 46 ++-- 5 files changed, 194 insertions(+), 177 deletions(-) diff --git a/lakeview/build.gradle b/lakeview/build.gradle index 03bd5398..b9a914fe 100644 --- a/lakeview/build.gradle +++ b/lakeview/build.gradle @@ -61,7 +61,9 @@ dependencies { exclude group: "com.google.protobuf", module: "protobuf-java-utils" } - implementation 'com.azure:azure-storage-blob:12.23.0' + implementation('com.azure:azure-storage-file-datalake:12.19.1') { + exclude group: 'com.azure', module: 'azure-core-http-netty' + } implementation 'com.azure:azure-identity:1.10.0' implementation 'org.springframework:spring-beans:5.3.23' diff --git a/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java b/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java index 05c54f85..e56020e0 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java +++ b/lakeview/src/main/java/ai/onehouse/storage/AzureAsyncStorageClient.java @@ -10,13 +10,14 @@ import com.azure.core.http.rest.PagedFlux; import com.azure.core.http.rest.PagedResponse; import com.azure.core.util.BinaryData; -import com.azure.storage.blob.BlobAsyncClient; -import com.azure.storage.blob.BlobContainerAsyncClient; -import com.azure.storage.blob.BlobServiceAsyncClient; -import com.azure.storage.blob.models.BlobErrorCode; -import com.azure.storage.blob.models.BlobItem; -import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.models.ListBlobsOptions; +import com.azure.storage.file.datalake.DataLakeDirectoryAsyncClient; +import com.azure.storage.file.datalake.DataLakeFileAsyncClient; +import com.azure.storage.file.datalake.DataLakeFileSystemAsyncClient; +import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; +import com.azure.storage.file.datalake.models.DataLakeRequestConditions; +import com.azure.storage.file.datalake.models.DataLakeStorageException; +import com.azure.storage.file.datalake.models.ListPathsOptions; +import com.azure.storage.file.datalake.models.PathItem; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; import java.io.ByteArrayInputStream; @@ -57,23 +58,23 @@ public CompletableFuture>> fetchObjectsByPage( return CompletableFuture.supplyAsync( () -> { try { - BlobServiceAsyncClient blobServiceClient = + DataLakeServiceAsyncClient dataLakeServiceClient = azureStorageClientProvider.getAzureAsyncClient(); - BlobContainerAsyncClient containerClient = - blobServiceClient.getBlobContainerAsyncClient(containerName); + DataLakeFileSystemAsyncClient fileSystemClient = + dataLakeServiceClient.getFileSystemAsyncClient(containerName); - ListBlobsOptions options = new ListBlobsOptions(); + ListPathsOptions options = new ListPathsOptions(); if (StringUtils.isNotBlank(prefix)) { - options.setPrefix(prefix); + options.setPath(prefix); } - PagedFlux pagedFlux = containerClient.listBlobsByHierarchy("/", options); + PagedFlux pagedFlux = fileSystemClient.listPaths(options); List files = new ArrayList<>(); String nextContinuationToken = null; // Get single page with continuation token - try (PagedResponse page = + try (PagedResponse page = StringUtils.isNotBlank(continuationToken) ? pagedFlux.byPage(continuationToken).blockFirst() : pagedFlux.byPage().blockFirst()) { @@ -82,10 +83,10 @@ public CompletableFuture>> fetchObjectsByPage( // Process items in the page page.getElements() .forEach( - blobItem -> { - String blobName = blobItem.getName(); - boolean isDirectory = blobItem.isPrefix() != null && blobItem.isPrefix(); - String fileName = blobName.replaceFirst("^" + prefix, ""); + pathItem -> { + String pathName = pathItem.getName(); + boolean isDirectory = pathItem.isDirectory(); + String fileName = pathName.replaceFirst("^" + prefix, ""); files.add( File.builder() @@ -93,7 +94,7 @@ public CompletableFuture>> fetchObjectsByPage( .lastModifiedAt( isDirectory ? Instant.EPOCH - : blobItem.getProperties().getLastModified().toInstant()) + : pathItem.getLastModified().toInstant()) .isDirectory(isDirectory) .build()); }); @@ -105,8 +106,7 @@ public CompletableFuture>> fetchObjectsByPage( return Pair.of(nextContinuationToken, files); } catch (Exception ex) { - log.error("Failed to fetch objects by page", ex); - throw clientException(ex, "fetchObjectsByPage", containerName); + return handleListPathsException(ex, containerName, prefix); } }, executorService); @@ -114,14 +114,14 @@ public CompletableFuture>> fetchObjectsByPage( @VisibleForTesting CompletableFuture readBlob(String azureUri) { - log.debug("Reading Azure blob: {}", azureUri); + log.debug("Reading Azure Data Lake file: {}", azureUri); return CompletableFuture.supplyAsync( () -> { try { - BlobAsyncClient blobClient = getBlobClient(azureUri); - return blobClient.downloadContent().block(); + DataLakeFileAsyncClient fileClient = getFileClient(azureUri); + return BinaryData.fromBytes(fileClient.read().blockLast().array()); } catch (Exception ex) { - log.error("Failed to read blob", ex); + log.error("Failed to read file", ex); throw clientException(ex, "readBlob", azureUri); } }, @@ -144,45 +144,65 @@ public CompletableFuture readFileAsBytes(String azureUri) { return readBlob(azureUri).thenApply(BinaryData::toBytes); } - private BlobAsyncClient getBlobClient(String azureUri) { - String containerName = storageUtils.getBucketNameFromUri(azureUri); - String blobPath = storageUtils.getPathFromUrl(azureUri); + private DataLakeFileAsyncClient getFileClient(String azureUri) { + String fileSystemName = storageUtils.getBucketNameFromUri(azureUri); + String filePath = storageUtils.getPathFromUrl(azureUri); - BlobServiceAsyncClient blobServiceClient = azureStorageClientProvider.getAzureAsyncClient(); - BlobContainerAsyncClient containerClient = - blobServiceClient.getBlobContainerAsyncClient(containerName); - return containerClient.getBlobAsyncClient(blobPath); + DataLakeServiceAsyncClient dataLakeServiceClient = azureStorageClientProvider.getAzureAsyncClient(); + DataLakeFileSystemAsyncClient fileSystemClient = + dataLakeServiceClient.getFileSystemAsyncClient(fileSystemName); + return fileSystemClient.getFileAsyncClient(filePath); + } + + private Pair> handleListPathsException( + Exception ex, String containerName, String prefix) { + // DataLake API returns 404 for non-existent paths, treat as empty directory + Throwable wrappedException = ex.getCause() != null ? ex.getCause() : ex; + if (wrappedException instanceof DataLakeStorageException) { + DataLakeStorageException dlsException = (DataLakeStorageException) wrappedException; + if ("PathNotFound".equals(dlsException.getErrorCode()) + || dlsException.getStatusCode() == 404) { + log.debug( + "Path not found, returning empty list for container: {}, prefix: {}", + containerName, + prefix); + return Pair.of(null, new ArrayList<>()); + } + } + log.error("Failed to fetch objects by page", ex); + throw clientException(ex, "fetchObjectsByPage", containerName); } @Override protected RuntimeException clientException(Throwable ex, String operation, String path) { Throwable wrappedException = ex.getCause() != null ? ex.getCause() : ex; - if (wrappedException instanceof BlobStorageException) { - BlobStorageException blobException = (BlobStorageException) wrappedException; - BlobErrorCode errorCode = blobException.getErrorCode(); - int statusCode = blobException.getStatusCode(); + if (wrappedException instanceof DataLakeStorageException) { + DataLakeStorageException dataLakeException = (DataLakeStorageException) wrappedException; + String errorCode = dataLakeException.getErrorCode(); + int statusCode = dataLakeException.getStatusCode(); log.error( - "Error in Azure operation: {} on path: {} code: {} status: {} message: {}", + "Error in Azure Data Lake operation: {} on path: {} code: {} status: {} message: {}", operation, path, errorCode, statusCode, - blobException.getMessage()); + dataLakeException.getMessage()); // Map to AccessDeniedException if (statusCode == 403 || statusCode == 401) { return new AccessDeniedException( String.format( "AccessDenied for operation: %s on path: %s with message: %s", - operation, path, blobException.getMessage())); + operation, path, dataLakeException.getMessage())); } // Map to NoSuchKeyException if (errorCode != null - && (errorCode == BlobErrorCode.BLOB_NOT_FOUND - || errorCode == BlobErrorCode.CONTAINER_NOT_FOUND)) { + && (errorCode.equals("PathNotFound") + || errorCode.equals("FilesystemNotFound") + || statusCode == 404)) { return new NoSuchKeyException( String.format("NoSuchKey for operation: %s on path: %s", operation, path)); } diff --git a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java index 5a25d238..e0944797 100644 --- a/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java +++ b/lakeview/src/main/java/ai/onehouse/storage/providers/AzureStorageClientProvider.java @@ -7,8 +7,8 @@ import com.azure.identity.ClientSecretCredentialBuilder; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; -import com.azure.storage.blob.BlobServiceAsyncClient; -import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; import com.azure.storage.common.StorageSharedKeyCredential; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; @@ -20,7 +20,7 @@ public class AzureStorageClientProvider { private final AzureConfig azureConfig; - private static BlobServiceAsyncClient azureAsyncClient; + private static DataLakeServiceAsyncClient azureAsyncClient; private static final Logger logger = LoggerFactory.getLogger(AzureStorageClientProvider.class); @Inject @@ -30,12 +30,12 @@ public AzureStorageClientProvider(@Nonnull Config config) { } @VisibleForTesting - protected BlobServiceAsyncClient createAzureAsyncClient() { - logger.debug("Instantiating Azure Blob Storage client"); + protected DataLakeServiceAsyncClient createAzureAsyncClient() { + logger.debug("Instantiating Azure Data Lake Storage client"); validateAzureConfig(azureConfig); - BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); - String endpoint = String.format("https://%s.blob.core.windows.net", azureConfig.getAccountName()); + DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder(); + String endpoint = String.format("https://%s.dfs.core.windows.net", azureConfig.getAccountName()); builder.endpoint(endpoint); // Option 1: Connection String (includes account key and endpoint) @@ -79,7 +79,7 @@ protected BlobServiceAsyncClient createAzureAsyncClient() { return builder.buildAsyncClient(); } - public BlobServiceAsyncClient getAzureAsyncClient() { + public DataLakeServiceAsyncClient getAzureAsyncClient() { if (azureAsyncClient == null) { azureAsyncClient = createAzureAsyncClient(); } diff --git a/lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java b/lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java index d74f9ede..8f71d225 100644 --- a/lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java +++ b/lakeview/src/test/java/ai/onehouse/storage/AzureAsyncStorageClientTest.java @@ -16,14 +16,13 @@ import com.azure.core.http.rest.PagedFlux; import com.azure.core.http.rest.PagedResponse; import com.azure.core.util.BinaryData; -import com.azure.storage.blob.BlobAsyncClient; -import com.azure.storage.blob.BlobContainerAsyncClient; -import com.azure.storage.blob.BlobServiceAsyncClient; -import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.file.datalake.DataLakeFileAsyncClient; +import com.azure.storage.file.datalake.DataLakeFileSystemAsyncClient; +import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; +import com.azure.storage.file.datalake.models.DataLakeStorageException; import com.azure.core.util.IterableStream; -import com.azure.storage.blob.models.BlobItem; -import com.azure.storage.blob.models.BlobItemProperties; -import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.file.datalake.models.PathItem; +import java.nio.ByteBuffer; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -54,29 +53,28 @@ class AzureAsyncStorageClientTest { @Mock private AzureStorageClientProvider mockAzureStorageClientProvider; @Mock private StorageUtils mockStorageUtils; - @Mock private BlobServiceAsyncClient mockBlobServiceAsyncClient; - @Mock private BlobContainerAsyncClient mockContainerAsyncClient; - @Mock private BlobAsyncClient mockBlobAsyncClient; - @Mock private PagedFlux mockPagedFlux; - @Mock private PagedResponse mockPagedResponse1; - @Mock private PagedResponse mockPagedResponse2; - @Mock private BlobItem mockBlobItem1; - @Mock private BlobItem mockBlobItem2; - @Mock private BlobItemProperties mockBlobItemProperties; + @Mock private DataLakeServiceAsyncClient mockDataLakeServiceAsyncClient; + @Mock private DataLakeFileSystemAsyncClient mockFileSystemAsyncClient; + @Mock private DataLakeFileAsyncClient mockFileAsyncClient; + @Mock private PagedFlux mockPagedFlux; + @Mock private PagedResponse mockPagedResponse1; + @Mock private PagedResponse mockPagedResponse2; + @Mock private PathItem mockPathItem1; + @Mock private PathItem mockPathItem2; private AzureAsyncStorageClient azureAsyncStorageClient; private static final String AZURE_URI = - "https://testaccount.blob.core.windows.net/test-container/test-blob"; + "https://testaccount.dfs.core.windows.net/test-container/test-file"; private static final String TEST_CONTAINER = "test-container"; - private static final String TEST_BLOB = "test-blob"; + private static final String TEST_FILE = "test-file"; @BeforeEach void setup() { lenient() .when(mockAzureStorageClientProvider.getAzureAsyncClient()) - .thenReturn(mockBlobServiceAsyncClient); + .thenReturn(mockDataLakeServiceAsyncClient); lenient().when(mockStorageUtils.getBucketNameFromUri(AZURE_URI)).thenReturn(TEST_CONTAINER); - lenient().when(mockStorageUtils.getPathFromUrl(AZURE_URI)).thenReturn(TEST_BLOB); + lenient().when(mockStorageUtils.getPathFromUrl(AZURE_URI)).thenReturn(TEST_FILE); azureAsyncStorageClient = new AzureAsyncStorageClient( mockAzureStorageClientProvider, mockStorageUtils, ForkJoinPool.commonPool()); @@ -87,30 +85,29 @@ void testListAllFilesInDir() throws ExecutionException, InterruptedException { String fileName = "file1"; String dirName = "dir1/"; String continuationToken = "page_2"; - String prefix = TEST_BLOB + "/"; + String prefix = TEST_FILE + "/"; - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.listPaths(any())) .thenReturn(mockPagedFlux); when(mockPagedFlux.byPage()).thenReturn(Flux.just(mockPagedResponse1)); when(mockPagedFlux.byPage(continuationToken)).thenReturn(Flux.just(mockPagedResponse2)); // First page when(mockPagedResponse1.getElements()) - .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + .thenReturn(IterableStream.of(Arrays.asList(mockPathItem1))); when(mockPagedResponse1.getContinuationToken()).thenReturn(continuationToken); - when(mockBlobItem1.getName()).thenReturn(prefix + fileName); - when(mockBlobItem1.isPrefix()).thenReturn(null); - when(mockBlobItem1.getProperties()).thenReturn(mockBlobItemProperties); - when(mockBlobItemProperties.getLastModified()).thenReturn(OffsetDateTime.now()); + when(mockPathItem1.getName()).thenReturn(prefix + fileName); + when(mockPathItem1.isDirectory()).thenReturn(false); + when(mockPathItem1.getLastModified()).thenReturn(OffsetDateTime.now()); // Second page when(mockPagedResponse2.getElements()) - .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem2))); + .thenReturn(IterableStream.of(Arrays.asList(mockPathItem2))); when(mockPagedResponse2.getContinuationToken()).thenReturn(null); - when(mockBlobItem2.getName()).thenReturn(prefix + dirName); - when(mockBlobItem2.isPrefix()).thenReturn(true); + when(mockPathItem2.getName()).thenReturn(prefix + dirName); + when(mockPathItem2.isDirectory()).thenReturn(true); List result = azureAsyncStorageClient.listAllFilesInDir(AZURE_URI).get(); @@ -128,20 +125,19 @@ void testFetchObjectsByPage() throws ExecutionException, InterruptedException { String continuationToken = "token"; String nextToken = "next-token"; - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.listPaths(any())) .thenReturn(mockPagedFlux); when(mockPagedFlux.byPage(continuationToken)) .thenReturn(Flux.just(mockPagedResponse1)); when(mockPagedResponse1.getElements()) - .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + .thenReturn(IterableStream.of(Arrays.asList(mockPathItem1))); when(mockPagedResponse1.getContinuationToken()).thenReturn(nextToken); - when(mockBlobItem1.getName()).thenReturn(prefix + "/" + fileName); - when(mockBlobItem1.isPrefix()).thenReturn(null); - when(mockBlobItem1.getProperties()).thenReturn(mockBlobItemProperties); - when(mockBlobItemProperties.getLastModified()).thenReturn(OffsetDateTime.now()); + when(mockPathItem1.getName()).thenReturn(prefix + "/" + fileName); + when(mockPathItem1.isDirectory()).thenReturn(false); + when(mockPathItem1.getLastModified()).thenReturn(OffsetDateTime.now()); Pair> result = azureAsyncStorageClient @@ -159,19 +155,18 @@ void testFetchObjectsByPageWithoutContinuationToken() String fileName = "file1"; String prefix = "prefix"; - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.listPaths(any())) .thenReturn(mockPagedFlux); when(mockPagedFlux.byPage()).thenReturn(Flux.just(mockPagedResponse1)); when(mockPagedResponse1.getElements()) - .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + .thenReturn(IterableStream.of(Arrays.asList(mockPathItem1))); when(mockPagedResponse1.getContinuationToken()).thenReturn(null); - when(mockBlobItem1.getName()).thenReturn(prefix + "/" + fileName); - when(mockBlobItem1.isPrefix()).thenReturn(null); - when(mockBlobItem1.getProperties()).thenReturn(mockBlobItemProperties); - when(mockBlobItemProperties.getLastModified()).thenReturn(OffsetDateTime.now()); + when(mockPathItem1.getName()).thenReturn(prefix + "/" + fileName); + when(mockPathItem1.isDirectory()).thenReturn(false); + when(mockPathItem1.getLastModified()).thenReturn(OffsetDateTime.now()); Pair> result = azureAsyncStorageClient.fetchObjectsByPage(TEST_CONTAINER, prefix, null, null).get(); @@ -183,12 +178,12 @@ void testFetchObjectsByPageWithoutContinuationToken() @Test void testReadBlob() throws ExecutionException, InterruptedException { byte[] fileContent = "test content".getBytes(StandardCharsets.UTF_8); - BinaryData binaryData = BinaryData.fromBytes(fileContent); + ByteBuffer byteBuffer = ByteBuffer.wrap(fileContent); - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); - when(mockBlobAsyncClient.downloadContent()).thenReturn(Mono.just(binaryData)); + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.getFileAsyncClient(TEST_FILE)).thenReturn(mockFileAsyncClient); + when(mockFileAsyncClient.read()).thenReturn(Flux.just(byteBuffer)); BinaryData result = azureAsyncStorageClient.readBlob(AZURE_URI).get(); @@ -199,12 +194,12 @@ void testReadBlob() throws ExecutionException, InterruptedException { @Test void testStreamFileAsync() throws ExecutionException, InterruptedException, IOException { byte[] fileContent = "test content".getBytes(StandardCharsets.UTF_8); - BinaryData binaryData = BinaryData.fromBytes(fileContent); + ByteBuffer byteBuffer = ByteBuffer.wrap(fileContent); - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); - when(mockBlobAsyncClient.downloadContent()).thenReturn(Mono.just(binaryData)); + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.getFileAsyncClient(TEST_FILE)).thenReturn(mockFileAsyncClient); + when(mockFileAsyncClient.read()).thenReturn(Flux.just(byteBuffer)); FileStreamData result = azureAsyncStorageClient.streamFileAsync(AZURE_URI).get(); @@ -218,12 +213,12 @@ void testStreamFileAsync() throws ExecutionException, InterruptedException, IOEx @Test void testReadFileAsBytes() throws ExecutionException, InterruptedException { byte[] fileContent = "test content".getBytes(StandardCharsets.UTF_8); - BinaryData binaryData = BinaryData.fromBytes(fileContent); + ByteBuffer byteBuffer = ByteBuffer.wrap(fileContent); - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); - when(mockBlobAsyncClient.downloadContent()).thenReturn(Mono.just(binaryData)); + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.getFileAsyncClient(TEST_FILE)).thenReturn(mockFileAsyncClient); + when(mockFileAsyncClient.read()).thenReturn(Flux.just(byteBuffer)); byte[] result = azureAsyncStorageClient.readFileAsBytes(AZURE_URI).get(); @@ -231,13 +226,13 @@ void testReadFileAsBytes() throws ExecutionException, InterruptedException { } @ParameterizedTest - @MethodSource("generateBlobStorageExceptionTestCases") - void testReadBlobWithBlobStorageException( - BlobStorageException exception, Class expectedExceptionClass) { - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); - when(mockBlobAsyncClient.downloadContent()).thenAnswer(invocation -> Mono.error(exception)); + @MethodSource("generateDataLakeStorageExceptionTestCases") + void testReadBlobWithDataLakeStorageException( + DataLakeStorageException exception, Class expectedExceptionClass) { + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.getFileAsyncClient(TEST_FILE)).thenReturn(mockFileAsyncClient); + when(mockFileAsyncClient.read()).thenAnswer(invocation -> Flux.error(exception)); CompletableFuture future = azureAsyncStorageClient.readBlob(AZURE_URI); CompletionException executionException = assertThrows(CompletionException.class, future::join); @@ -246,38 +241,38 @@ void testReadBlobWithBlobStorageException( assertInstanceOf(expectedExceptionClass, cause); } - static Stream generateBlobStorageExceptionTestCases() { - BlobStorageException accessDenied403 = mock(BlobStorageException.class); + static Stream generateDataLakeStorageExceptionTestCases() { + DataLakeStorageException accessDenied403 = mock(DataLakeStorageException.class); when(accessDenied403.getStatusCode()).thenReturn(403); when(accessDenied403.getMessage()).thenReturn("Access denied"); when(accessDenied403.getErrorCode()).thenReturn(null); - BlobStorageException unauthorized401 = mock(BlobStorageException.class); + DataLakeStorageException unauthorized401 = mock(DataLakeStorageException.class); when(unauthorized401.getStatusCode()).thenReturn(401); when(unauthorized401.getMessage()).thenReturn("Unauthorized"); when(unauthorized401.getErrorCode()).thenReturn(null); - BlobStorageException blobNotFound = mock(BlobStorageException.class); - when(blobNotFound.getStatusCode()).thenReturn(404); - when(blobNotFound.getErrorCode()).thenReturn(BlobErrorCode.BLOB_NOT_FOUND); - when(blobNotFound.getMessage()).thenReturn("Blob not found"); + DataLakeStorageException pathNotFound = mock(DataLakeStorageException.class); + when(pathNotFound.getStatusCode()).thenReturn(404); + when(pathNotFound.getErrorCode()).thenReturn("PathNotFound"); + when(pathNotFound.getMessage()).thenReturn("Path not found"); - BlobStorageException containerNotFound = mock(BlobStorageException.class); - when(containerNotFound.getStatusCode()).thenReturn(404); - when(containerNotFound.getErrorCode()).thenReturn(BlobErrorCode.CONTAINER_NOT_FOUND); - when(containerNotFound.getMessage()).thenReturn("Container not found"); + DataLakeStorageException filesystemNotFound = mock(DataLakeStorageException.class); + when(filesystemNotFound.getStatusCode()).thenReturn(404); + when(filesystemNotFound.getErrorCode()).thenReturn("FilesystemNotFound"); + when(filesystemNotFound.getMessage()).thenReturn("Filesystem not found"); - BlobStorageException tooManyRequests = mock(BlobStorageException.class); + DataLakeStorageException tooManyRequests = mock(DataLakeStorageException.class); when(tooManyRequests.getStatusCode()).thenReturn(429); when(tooManyRequests.getMessage()).thenReturn("Too many requests"); when(tooManyRequests.getErrorCode()).thenReturn(null); - BlobStorageException serviceUnavailable = mock(BlobStorageException.class); + DataLakeStorageException serviceUnavailable = mock(DataLakeStorageException.class); when(serviceUnavailable.getStatusCode()).thenReturn(503); when(serviceUnavailable.getMessage()).thenReturn("Service unavailable"); when(serviceUnavailable.getErrorCode()).thenReturn(null); - BlobStorageException internalError = mock(BlobStorageException.class); + DataLakeStorageException internalError = mock(DataLakeStorageException.class); when(internalError.getStatusCode()).thenReturn(500); when(internalError.getMessage()).thenReturn("Internal error"); when(internalError.getErrorCode()).thenReturn(null); @@ -285,8 +280,8 @@ static Stream generateBlobStorageExceptionTestCases() { return Stream.of( Arguments.of(accessDenied403, AccessDeniedException.class), Arguments.of(unauthorized401, AccessDeniedException.class), - Arguments.of(blobNotFound, NoSuchKeyException.class), - Arguments.of(containerNotFound, NoSuchKeyException.class), + Arguments.of(pathNotFound, NoSuchKeyException.class), + Arguments.of(filesystemNotFound, NoSuchKeyException.class), Arguments.of(tooManyRequests, RateLimitException.class), Arguments.of(serviceUnavailable, RateLimitException.class), Arguments.of(internalError, ObjectStorageClientException.class)); @@ -296,10 +291,10 @@ static Stream generateBlobStorageExceptionTestCases() { @MethodSource("generateWrappedExceptionTestCases") void testReadBlobWithWrappedException( RuntimeException exception, Class expectedExceptionClass) { - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); - when(mockBlobAsyncClient.downloadContent()).thenAnswer(invocation -> Mono.error(exception)); + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.getFileAsyncClient(TEST_FILE)).thenReturn(mockFileAsyncClient); + when(mockFileAsyncClient.read()).thenAnswer(invocation -> Flux.error(exception)); CompletableFuture future = azureAsyncStorageClient.readBlob(AZURE_URI); CompletionException executionException = assertThrows(CompletionException.class, future::join); @@ -317,14 +312,14 @@ static Stream generateWrappedExceptionTestCases() { @Test void testFetchObjectsByPageWithException() { - BlobStorageException exception = mock(BlobStorageException.class); + DataLakeStorageException exception = mock(DataLakeStorageException.class); when(exception.getStatusCode()).thenReturn(429); when(exception.getMessage()).thenReturn("Throttled"); when(exception.getErrorCode()).thenReturn(null); - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.listPaths(any())) .thenThrow(exception); CompletableFuture>> future = @@ -353,10 +348,10 @@ void testInitializeClient() { void testReadBlobWithGenericException() { RuntimeException exception = new RuntimeException("Generic error"); - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.getBlobAsyncClient(TEST_BLOB)).thenReturn(mockBlobAsyncClient); - when(mockBlobAsyncClient.downloadContent()).thenAnswer(invocation -> Mono.error(exception)); + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.getFileAsyncClient(TEST_FILE)).thenReturn(mockFileAsyncClient); + when(mockFileAsyncClient.read()).thenAnswer(invocation -> Flux.error(exception)); CompletableFuture future = azureAsyncStorageClient.readBlob(AZURE_URI); CompletionException executionException = assertThrows(CompletionException.class, future::join); @@ -370,17 +365,17 @@ void testFetchObjectsByPageWithDirectoryItems() throws ExecutionException, Inter String dirName = "dir1/"; String prefix = "prefix"; - when(mockBlobServiceAsyncClient.getBlobContainerAsyncClient(TEST_CONTAINER)) - .thenReturn(mockContainerAsyncClient); - when(mockContainerAsyncClient.listBlobsByHierarchy(eq("/"), any())) + when(mockDataLakeServiceAsyncClient.getFileSystemAsyncClient(TEST_CONTAINER)) + .thenReturn(mockFileSystemAsyncClient); + when(mockFileSystemAsyncClient.listPaths(any())) .thenReturn(mockPagedFlux); when(mockPagedFlux.byPage()).thenReturn(Flux.just(mockPagedResponse1)); when(mockPagedResponse1.getElements()) - .thenReturn(IterableStream.of(Arrays.asList(mockBlobItem1))); + .thenReturn(IterableStream.of(Arrays.asList(mockPathItem1))); when(mockPagedResponse1.getContinuationToken()).thenReturn(null); - when(mockBlobItem1.getName()).thenReturn(prefix + dirName); - when(mockBlobItem1.isPrefix()).thenReturn(true); + when(mockPathItem1.getName()).thenReturn(prefix + dirName); + when(mockPathItem1.isDirectory()).thenReturn(true); Pair> result = azureAsyncStorageClient.fetchObjectsByPage(TEST_CONTAINER, prefix, null, null).get(); diff --git a/lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java b/lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java index 2deaa006..dc878264 100644 --- a/lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java +++ b/lakeview/src/test/java/ai/onehouse/storage/providers/AzureStorageClientProviderTest.java @@ -11,8 +11,8 @@ import com.azure.identity.ClientSecretCredentialBuilder; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; -import com.azure.storage.blob.BlobServiceAsyncClient; -import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.file.datalake.DataLakeServiceAsyncClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; import com.azure.storage.common.StorageSharedKeyCredential; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -31,7 +31,7 @@ class AzureStorageClientProviderTest { @Mock private ConfigV1 config; @Mock private FileSystemConfiguration fileSystemConfiguration; @Mock private AzureConfig azureConfig; - @Mock private BlobServiceAsyncClient mockBlobServiceAsyncClient; + @Mock private DataLakeServiceAsyncClient mockDataLakeServiceAsyncClient; @BeforeEach void setup() { @@ -70,13 +70,13 @@ void testInstantiateAzureClientWithConnectionString(boolean isRefreshClient) { Optional.of( "DefaultEndpointsProtocol=https;AccountName=testaccount;AccountKey=key;EndpointSuffix=core.windows.net")); - try (MockedConstruction mockedBuilder = + try (MockedConstruction mockedBuilder = mockConstruction( - BlobServiceClientBuilder.class, + DataLakeServiceClientBuilder.class, (mock, context) -> { when(mock.endpoint(anyString())).thenReturn(mock); when(mock.connectionString(anyString())).thenReturn(mock); - when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + when(mock.buildAsyncClient()).thenReturn(mockDataLakeServiceAsyncClient); })) { AzureStorageClientProvider azureClientProviderSpy = @@ -84,7 +84,7 @@ void testInstantiateAzureClientWithConnectionString(boolean isRefreshClient) { AzureStorageClientProvider.resetAzureAsyncClient(); if (!isRefreshClient) { - BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + DataLakeServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); assertNotNull(result); } else { azureClientProviderSpy.refreshClient(); @@ -101,13 +101,13 @@ void testInstantiateAzureClientWithAccountKey(boolean isRefreshClient) { when(azureConfig.getConnectionString()).thenReturn(Optional.empty()); when(azureConfig.getAccountKey()).thenReturn(Optional.of("dGVzdGFjY291bnRrZXk=")); - try (MockedConstruction mockedBuilder = + try (MockedConstruction mockedBuilder = mockConstruction( - BlobServiceClientBuilder.class, + DataLakeServiceClientBuilder.class, (mock, context) -> { when(mock.endpoint(anyString())).thenReturn(mock); when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock); - when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + when(mock.buildAsyncClient()).thenReturn(mockDataLakeServiceAsyncClient); }); MockedConstruction mockedCredential = mockConstruction(StorageSharedKeyCredential.class)) { @@ -117,7 +117,7 @@ void testInstantiateAzureClientWithAccountKey(boolean isRefreshClient) { AzureStorageClientProvider.resetAzureAsyncClient(); if (!isRefreshClient) { - BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + DataLakeServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); assertNotNull(result); } else { azureClientProviderSpy.refreshClient(); @@ -137,13 +137,13 @@ void testInstantiateAzureClientWithServicePrincipal(boolean isRefreshClient) { when(azureConfig.getClientId()).thenReturn(Optional.of("test-client-id")); when(azureConfig.getClientSecret()).thenReturn(Optional.of("test-client-secret")); - try (MockedConstruction mockedBuilder = + try (MockedConstruction mockedBuilder = mockConstruction( - BlobServiceClientBuilder.class, + DataLakeServiceClientBuilder.class, (mock, context) -> { when(mock.endpoint(anyString())).thenReturn(mock); when(mock.credential(any(ClientSecretCredential.class))).thenReturn(mock); - when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + when(mock.buildAsyncClient()).thenReturn(mockDataLakeServiceAsyncClient); }); MockedConstruction mockedCredBuilder = mockConstruction( @@ -160,7 +160,7 @@ void testInstantiateAzureClientWithServicePrincipal(boolean isRefreshClient) { AzureStorageClientProvider.resetAzureAsyncClient(); if (!isRefreshClient) { - BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + DataLakeServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); assertNotNull(result); } else { azureClientProviderSpy.refreshClient(); @@ -180,13 +180,13 @@ void testInstantiateAzureClientWithManagedIdentity(boolean isRefreshClient) { when(azureConfig.getClientId()).thenReturn(Optional.of("test-client-id")); when(azureConfig.getClientSecret()).thenReturn(Optional.empty()); - try (MockedConstruction mockedBuilder = + try (MockedConstruction mockedBuilder = mockConstruction( - BlobServiceClientBuilder.class, + DataLakeServiceClientBuilder.class, (mock, context) -> { when(mock.endpoint(anyString())).thenReturn(mock); when(mock.credential(any(DefaultAzureCredential.class))).thenReturn(mock); - when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + when(mock.buildAsyncClient()).thenReturn(mockDataLakeServiceAsyncClient); }); MockedConstruction mockedCredBuilder = mockConstruction( @@ -202,7 +202,7 @@ void testInstantiateAzureClientWithManagedIdentity(boolean isRefreshClient) { AzureStorageClientProvider.resetAzureAsyncClient(); if (!isRefreshClient) { - BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + DataLakeServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); assertNotNull(result); } else { azureClientProviderSpy.refreshClient(); @@ -221,13 +221,13 @@ void testInstantiateAzureClientWithDefaultCredential(boolean isRefreshClient) { when(azureConfig.getTenantId()).thenReturn(Optional.empty()); when(azureConfig.getClientId()).thenReturn(Optional.empty()); - try (MockedConstruction mockedBuilder = + try (MockedConstruction mockedBuilder = mockConstruction( - BlobServiceClientBuilder.class, + DataLakeServiceClientBuilder.class, (mock, context) -> { when(mock.endpoint(anyString())).thenReturn(mock); when(mock.credential(any(DefaultAzureCredential.class))).thenReturn(mock); - when(mock.buildAsyncClient()).thenReturn(mockBlobServiceAsyncClient); + when(mock.buildAsyncClient()).thenReturn(mockDataLakeServiceAsyncClient); }); MockedConstruction mockedCredBuilder = mockConstruction( @@ -241,7 +241,7 @@ void testInstantiateAzureClientWithDefaultCredential(boolean isRefreshClient) { AzureStorageClientProvider.resetAzureAsyncClient(); if (!isRefreshClient) { - BlobServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); + DataLakeServiceAsyncClient result = azureClientProviderSpy.getAzureAsyncClient(); assertNotNull(result); } else { azureClientProviderSpy.refreshClient();