Skip to content
5 changes: 5 additions & 0 deletions lakeview/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ dependencies {
exclude group: "com.google.protobuf", module: "protobuf-java-utils"
}

implementation('com.azure:azure-storage-file-datalake:12.19.1') {
exclude group: 'com.azure', module: 'azure-core-http-netty'
}
implementation 'com.azure:azure-identity:1.10.0'

implementation 'org.springframework:spring-beans:5.3.23'

testImplementation "org.mockito:mockito-core:${versions.mockito}"
Expand Down
8 changes: 8 additions & 0 deletions lakeview/src/main/java/ai/onehouse/RuntimeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import ai.onehouse.config.ConfigProvider;
import ai.onehouse.config.models.common.FileSystemConfiguration;
import ai.onehouse.storage.AsyncStorageClient;
import ai.onehouse.storage.AzureAsyncStorageClient;
import ai.onehouse.storage.GCSAsyncStorageClient;
import ai.onehouse.storage.S3AsyncStorageClient;
import ai.onehouse.storage.StorageUtils;
import ai.onehouse.storage.providers.AzureStorageClientProvider;
import ai.onehouse.storage.providers.GcsClientProvider;
import ai.onehouse.storage.providers.S3AsyncClientProvider;

Expand Down Expand Up @@ -128,10 +130,13 @@ static AsyncStorageClient providesAsyncStorageClientForDiscovery(
StorageUtils storageUtils,
@TableDiscoveryS3ObjectStorageClient S3AsyncClientProvider s3AsyncClientProvider,
GcsClientProvider gcsClientProvider,
AzureStorageClientProvider azureStorageClientProvider,
ExecutorService executorService) {
FileSystemConfiguration fileSystemConfiguration = config.getFileSystemConfiguration();
if (fileSystemConfiguration.getS3Config() != null) {
return new S3AsyncStorageClient(s3AsyncClientProvider, storageUtils, executorService);
} else if (fileSystemConfiguration.getAzureConfig() != null) {
return new AzureAsyncStorageClient(azureStorageClientProvider, storageUtils, executorService);
} else {
return new GCSAsyncStorageClient(gcsClientProvider, storageUtils, executorService);
}
Expand All @@ -145,10 +150,13 @@ static AsyncStorageClient providesAsyncStorageClientForUpload(
StorageUtils storageUtils,
@TableMetadataUploadS3ObjectStorageClient S3AsyncClientProvider s3AsyncClientProvider,
GcsClientProvider gcsClientProvider,
AzureStorageClientProvider azureStorageClientProvider,
ExecutorService executorService) {
FileSystemConfiguration fileSystemConfiguration = config.getFileSystemConfiguration();
if (fileSystemConfiguration.getS3Config() != null) {
return new S3AsyncStorageClient(s3AsyncClientProvider, storageUtils, executorService);
} else if (fileSystemConfiguration.getAzureConfig() != null) {
return new AzureAsyncStorageClient(azureStorageClientProvider, storageUtils, executorService);
} else {
return new GCSAsyncStorageClient(gcsClientProvider, storageUtils, executorService);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ai.onehouse.config.models.common;

import java.util.Optional;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.jackson.Jacksonized;

@Builder
@Getter
@Jacksonized
@EqualsAndHashCode
public class AzureConfig {
@NonNull private String accountName;

// Optional authentication methods
// Option 1: Account Key (for dev/testing, never expires)
@Builder.Default private Optional<String> accountKey = Optional.empty();

// Option 2: Connection String (alternative to account key)
@Builder.Default private Optional<String> connectionString = Optional.empty();

// Option 3: Service Principal
@Builder.Default private Optional<String> tenantId = Optional.empty();
@Builder.Default private Optional<String> clientId = Optional.empty();
@Builder.Default private Optional<String> clientSecret = Optional.empty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
public class FileSystemConfiguration {
private S3Config s3Config;
private GCSConfig gcsConfig;
private AzureConfig azureConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
public class StorageConstants {
private StorageConstants() {}

// typical s3 path: "s3://bucket-name/path/to/object"
// gcs path format "gs:// [bucket] /path/to/file"
/*
* typical s3 path: "s3://bucket-name/path/to/object"
* gcs path format: "gs://bucket/path/to/file"
* azure blob format: "https://account.blob.core.windows.net/container/path/to/blob"
* azure adls gen2 format: "https://account.dfs.core.windows.net/container/path/to/file"
*/
public static final Pattern OBJECT_STORAGE_URI_PATTERN =
Pattern.compile("^(s3://|gs://)([^/]+)(/.*)?");
Pattern.compile("^(?:(s3://|gs://)|https://[^.]+\\.(?:blob|dfs)\\.core\\.windows\\.net/)([^/]+)(/.*)?$");

// https://cloud.google.com/compute/docs/naming-resources#resource-name-format
public static final String GCP_RESOURCE_NAME_FORMAT = "^[a-z]([-a-z0-9]*[a-z0-9])$";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package ai.onehouse.storage;

import ai.onehouse.exceptions.AccessDeniedException;
import ai.onehouse.exceptions.NoSuchKeyException;
import ai.onehouse.exceptions.ObjectStorageClientException;
import ai.onehouse.exceptions.RateLimitException;
import ai.onehouse.storage.models.File;
import ai.onehouse.storage.models.FileStreamData;
import ai.onehouse.storage.providers.AzureStorageClientProvider;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.util.BinaryData;
import com.azure.storage.file.datalake.DataLakeDirectoryAsyncClient;
import com.azure.storage.file.datalake.DataLakeFileAsyncClient;
import com.azure.storage.file.datalake.DataLakeFileSystemAsyncClient;
import com.azure.storage.file.datalake.DataLakeServiceAsyncClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.ListPathsOptions;
import com.azure.storage.file.datalake.models.PathItem;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.io.ByteArrayInputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

@Slf4j
public class AzureAsyncStorageClient extends AbstractAsyncStorageClient {
private final AzureStorageClientProvider azureStorageClientProvider;

@Inject
public AzureAsyncStorageClient(
@Nonnull AzureStorageClientProvider azureStorageClientProvider,
@Nonnull StorageUtils storageUtils,
@Nonnull ExecutorService executorService) {
super(executorService, storageUtils);
this.azureStorageClientProvider = azureStorageClientProvider;
}

@Override
public CompletableFuture<Pair<String, List<File>>> fetchObjectsByPage(
String containerName, String prefix, String continuationToken, String startAfter) {

log.debug(
"fetching files in container {} with prefix {} continuationToken {} startAfter {}",
containerName,
prefix,
continuationToken,
startAfter);

return CompletableFuture.supplyAsync(
() -> {
try {
DataLakeServiceAsyncClient dataLakeServiceClient =
azureStorageClientProvider.getAzureAsyncClient();
DataLakeFileSystemAsyncClient fileSystemClient =
dataLakeServiceClient.getFileSystemAsyncClient(containerName);

ListPathsOptions options = new ListPathsOptions();
if (StringUtils.isNotBlank(prefix)) {
options.setPath(prefix);
}

PagedFlux<PathItem> pagedFlux = fileSystemClient.listPaths(options);

List<File> files = new ArrayList<>();
String nextContinuationToken = null;

// Get single page with continuation token
try (PagedResponse<PathItem> page =
StringUtils.isNotBlank(continuationToken)
? pagedFlux.byPage(continuationToken).blockFirst()
: pagedFlux.byPage().blockFirst()) {

if (page != null) {
// Process items in the page
page.getElements()
.forEach(
pathItem -> {
String pathName = pathItem.getName();
boolean isDirectory = pathItem.isDirectory();
String fileName = pathName.replaceFirst("^" + prefix, "");

files.add(
File.builder()
.filename(fileName)
.lastModifiedAt(
isDirectory
? Instant.EPOCH
: pathItem.getLastModified().toInstant())
.isDirectory(isDirectory)
.build());
});

// Get continuation token for next page
nextContinuationToken = page.getContinuationToken();
}
}

return Pair.of(nextContinuationToken, files);
} catch (Exception ex) {
return handleListPathsException(ex, containerName, prefix);
}
},
executorService);
}

@VisibleForTesting
CompletableFuture<BinaryData> readBlob(String azureUri) {
log.debug("Reading Azure Data Lake file: {}", azureUri);
return CompletableFuture.supplyAsync(
() -> {
try {
DataLakeFileAsyncClient fileClient = getFileClient(azureUri);
return BinaryData.fromBytes(fileClient.read().blockLast().array());
} catch (Exception ex) {
log.error("Failed to read file", ex);
throw clientException(ex, "readBlob", azureUri);
}
},
executorService);
}

@Override
public CompletableFuture<FileStreamData> streamFileAsync(String azureUri) {
return readBlob(azureUri)
.thenApply(
binaryData ->
FileStreamData.builder()
.inputStream(new ByteArrayInputStream(binaryData.toBytes()))
.fileSize((long) binaryData.toBytes().length)
.build());
}

@Override
public CompletableFuture<byte[]> readFileAsBytes(String azureUri) {
return readBlob(azureUri).thenApply(BinaryData::toBytes);
}

private DataLakeFileAsyncClient getFileClient(String azureUri) {
String fileSystemName = storageUtils.getBucketNameFromUri(azureUri);
String filePath = storageUtils.getPathFromUrl(azureUri);

DataLakeServiceAsyncClient dataLakeServiceClient = azureStorageClientProvider.getAzureAsyncClient();
DataLakeFileSystemAsyncClient fileSystemClient =
dataLakeServiceClient.getFileSystemAsyncClient(fileSystemName);
return fileSystemClient.getFileAsyncClient(filePath);
}

private Pair<String, List<File>> handleListPathsException(
Exception ex, String containerName, String prefix) {
// DataLake API returns 404 for non-existent paths, treat as empty directory
Throwable wrappedException = ex.getCause() != null ? ex.getCause() : ex;
if (wrappedException instanceof DataLakeStorageException) {
DataLakeStorageException dlsException = (DataLakeStorageException) wrappedException;
if ("PathNotFound".equals(dlsException.getErrorCode())
|| dlsException.getStatusCode() == 404) {
log.debug(
"Path not found, returning empty list for container: {}, prefix: {}",
containerName,
prefix);
return Pair.of(null, new ArrayList<>());
}
}
log.error("Failed to fetch objects by page", ex);
throw clientException(ex, "fetchObjectsByPage", containerName);
}

@Override
protected RuntimeException clientException(Throwable ex, String operation, String path) {
Throwable wrappedException = ex.getCause() != null ? ex.getCause() : ex;

if (wrappedException instanceof DataLakeStorageException) {
DataLakeStorageException dataLakeException = (DataLakeStorageException) wrappedException;
String errorCode = dataLakeException.getErrorCode();
int statusCode = dataLakeException.getStatusCode();

log.error(
"Error in Azure Data Lake operation: {} on path: {} code: {} status: {} message: {}",
operation,
path,
errorCode,
statusCode,
dataLakeException.getMessage());

// Map to AccessDeniedException
if (statusCode == 403 || statusCode == 401) {
return new AccessDeniedException(
String.format(
"AccessDenied for operation: %s on path: %s with message: %s",
operation, path, dataLakeException.getMessage()));
}

// Map to NoSuchKeyException
if (errorCode != null
&& (errorCode.equals("PathNotFound")
|| errorCode.equals("FilesystemNotFound")
|| statusCode == 404)) {
return new NoSuchKeyException(
String.format("NoSuchKey for operation: %s on path: %s", operation, path));
}

// Map to RateLimitException
if (statusCode == 429 || statusCode == 503) {
return new RateLimitException(
String.format("Throttled by Azure for operation: %s on path: %s", operation, path));
}
} else if (wrappedException instanceof AccessDeniedException
|| wrappedException instanceof NoSuchKeyException
|| wrappedException instanceof RateLimitException) {
return (RuntimeException) wrappedException;
}

return new ObjectStorageClientException(ex);
}

@Override
public void refreshClient() {
azureStorageClientProvider.refreshClient();
}

@Override
public void initializeClient() {
azureStorageClientProvider.getAzureAsyncClient();
}
}
Loading
Loading