Skip to content

Commit

Permalink
Throw any error from MinIO with ListObject APIs (#214)
Browse files Browse the repository at this point in the history
* Throw any error from MinIO when listing objects
* Capture ListObjectsAsync exceptions in VerifyObjectExistsAsync
* Configure minio client timeout
* Throw VerifyObjectsException on error
* Convert MinIO exception with custom exceptions
* Update API doc


Signed-off-by: Victor Chang <vicchang@nvidia.com>
  • Loading branch information
mocsharp authored Apr 6, 2023
1 parent e7f640e commit 1757165
Show file tree
Hide file tree
Showing 13 changed files with 600 additions and 64 deletions.
3 changes: 2 additions & 1 deletion src/Plugins/MinIO/ConfigurationKeys.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@ internal static class ConfigurationKeys
public static readonly string McExecutablePath = "executableLocation";
public static readonly string McServiceName = "serviceName";
public static readonly string CreateBuckets = "createBuckets";
public static readonly string ApiCallTimeout = "timeout";

public static readonly string[] RequiredKeys = new[] { EndPoint, AccessKey, AccessToken, SecuredConnection, Region };
public static readonly string[] McRequiredKeys = new[] { EndPoint, AccessKey, AccessToken, McExecutablePath, McServiceName };
Expand Down
9 changes: 8 additions & 1 deletion src/Plugins/MinIO/LoggerMethods.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/

using Microsoft.Extensions.Logging;
using Minio.Exceptions;

namespace Monai.Deploy.Storage.MinIO
{
Expand Down Expand Up @@ -43,5 +44,11 @@ public static partial class LoggerMethods

[LoggerMessage(EventId = 20007, Level = LogLevel.Information, Message = "Bucket {bucket} created in region {region}.")]
public static partial void BucketCreated(this ILogger logger, string bucket, string region);

[LoggerMessage(EventId = 20008, Level = LogLevel.Error, Message = "Error connecting to MinIO.")]
public static partial void ConnectionError(this ILogger logger, ConnectionException ex);

[LoggerMessage(EventId = 20009, Level = LogLevel.Error, Message = "Storage service error.")]
public static partial void StorageServiceError(this ILogger logger, Exception ex);
}
}
12 changes: 10 additions & 2 deletions src/Plugins/MinIO/MinIoClientFactory.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@ namespace Monai.Deploy.Storage.MinIO
public class MinIoClientFactory : IMinIoClientFactory
{
private static readonly string DefaultClient = "_DEFAULT_";
internal static readonly int DefaultTimeout = 2500;
private readonly ConcurrentDictionary<string, MinioClient> _clients;

private StorageServiceConfiguration Options { get; }
Expand Down Expand Up @@ -112,10 +113,17 @@ private MinioClient CreateClient(string accessKey, string accessToken)
{
var endpoint = Options.Settings[ConfigurationKeys.EndPoint];
var securedConnection = Options.Settings[ConfigurationKeys.SecuredConnection];
var timeout = DefaultTimeout;

if (Options.Settings.ContainsKey(ConfigurationKeys.ApiCallTimeout) && !int.TryParse(Options.Settings[ConfigurationKeys.ApiCallTimeout], out timeout))
{
throw new ConfigurationException($"Invalid value specified for {ConfigurationKeys.ApiCallTimeout}: {Options.Settings[ConfigurationKeys.ApiCallTimeout]}");
}

var client = new MinioClient()
.WithEndpoint(endpoint)
.WithCredentials(accessKey, accessToken);
.WithCredentials(accessKey, accessToken)
.WithTimeout(timeout);

if (bool.Parse(securedConnection))
{
Expand Down
205 changes: 147 additions & 58 deletions src/Plugins/MinIO/MinIoStorageService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,10 +20,12 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Minio;
using Minio.Exceptions;
using Monai.Deploy.Storage.API;
using Monai.Deploy.Storage.Configuration;
using Monai.Deploy.Storage.S3Policy;
using Newtonsoft.Json;
using ObjectNotFoundException = Minio.Exceptions.ObjectNotFoundException;

namespace Monai.Deploy.Storage.MinIO
{
Expand All @@ -39,7 +41,7 @@ public class MinIoStorageService : IStorageService
public MinIoStorageService(IMinIoClientFactory minioClientFactory, IAmazonSecurityTokenServiceClientFactory amazonSecurityTokenServiceClientFactory, IOptions<StorageServiceConfiguration> options, ILogger<MinIoStorageService> logger)
{
Guard.Against.Null(options);
_minioClientFactory = minioClientFactory ?? throw new ArgumentNullException(nameof(IMinIoClientFactory));
_minioClientFactory = minioClientFactory ?? throw new ArgumentNullException(nameof(minioClientFactory));
_amazonSecurityTokenServiceClientFactory = amazonSecurityTokenServiceClientFactory ?? throw new ArgumentNullException(nameof(amazonSecurityTokenServiceClientFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

Expand Down Expand Up @@ -101,13 +103,14 @@ public async Task<Dictionary<string, bool>> VerifyObjectsExistAsync(string bucke
Guard.Against.Null(artifactList);

var existingObjectsDict = new Dictionary<string, bool>();
var exceptions = new List<Exception>();

foreach (var artifact in artifactList)
{
try
{
var fileObjects = await ListObjectsAsync(bucketName, artifact).ConfigureAwait(false);
var folderObjects = await ListObjectsAsync(bucketName, artifact.EndsWith("/") ? artifact : $"{artifact}/", true).ConfigureAwait(false);
var fileObjects = await ListObjectsAsync(bucketName, artifact, cancellationToken: cancellationToken).ConfigureAwait(false);
var folderObjects = await ListObjectsAsync(bucketName, artifact.EndsWith("/") ? artifact : $"{artifact}/", true, cancellationToken).ConfigureAwait(false);

if (!folderObjects.Any() && !fileObjects.Any())
{
Expand All @@ -122,10 +125,14 @@ public async Task<Dictionary<string, bool>> VerifyObjectsExistAsync(string bucke
{
_logger.VerifyObjectError(bucketName, e);
existingObjectsDict.Add(artifact, false);
exceptions.Add(e);
}

}

if (exceptions.Any())
{
throw new VerifyObjectsException(exceptions, existingObjectsDict);
}
return existingObjectsDict;
}

Expand All @@ -134,17 +141,25 @@ public async Task<bool> VerifyObjectExistsAsync(string bucketName, string artifa
Guard.Against.NullOrWhiteSpace(bucketName);
Guard.Against.NullOrWhiteSpace(artifactName);

var fileObjects = await ListObjectsAsync(bucketName, artifactName).ConfigureAwait(false);
var folderObjects = await ListObjectsAsync(bucketName, artifactName.EndsWith("/") ? artifactName : $"{artifactName}/", true).ConfigureAwait(false);

if (folderObjects.Any() || fileObjects.Any())
try
{
return true;
}
var fileObjects = await ListObjectsAsync(bucketName, artifactName, cancellationToken: cancellationToken).ConfigureAwait(false);
var folderObjects = await ListObjectsAsync(bucketName, artifactName.EndsWith("/") ? artifactName : $"{artifactName}/", true, cancellationToken).ConfigureAwait(false);

if (folderObjects.Any() || fileObjects.Any())
{
return true;
}

_logger.FileNotFoundError(bucketName, $"{artifactName}");
_logger.FileNotFoundError(bucketName, $"{artifactName}");

return false;
return false;
}
catch (Exception ex)
{
_logger.VerifyObjectError(bucketName, ex);
throw new VerifyObjectsException(ex.Message, ex);
}
}

public async Task PutObjectAsync(string bucketName, string objectName, Stream data, long size, string contentType, Dictionary<string, string>? metadata, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -295,36 +310,51 @@ public async Task CreateFolderWithCredentialsAsync(string bucketName, string fol

#region Internal Helper Methods

private static async Task CopyObjectUsingClient(IObjectOperations client, string sourceBucketName, string sourceObjectName, string destinationBucketName, string destinationObjectName, CancellationToken cancellationToken)
private async Task CopyObjectUsingClient(IObjectOperations client, string sourceBucketName, string sourceObjectName, string destinationBucketName, string destinationObjectName, CancellationToken cancellationToken)
{
var copySourceObjectArgs = new CopySourceObjectArgs()
.WithBucket(sourceBucketName)
.WithObject(sourceObjectName);
var copyObjectArgs = new CopyObjectArgs()
.WithBucket(destinationBucketName)
.WithObject(destinationObjectName)
.WithCopyObjectSource(copySourceObjectArgs);
await client.CopyObjectAsync(copyObjectArgs, cancellationToken).ConfigureAwait(false);
await CallApi(async () =>
{
try
{
var copySourceObjectArgs = new CopySourceObjectArgs()
.WithBucket(sourceBucketName)
.WithObject(sourceObjectName);
var copyObjectArgs = new CopyObjectArgs()
.WithBucket(destinationBucketName)
.WithObject(destinationObjectName)
.WithCopyObjectSource(copySourceObjectArgs);
await client.CopyObjectAsync(copyObjectArgs, cancellationToken).ConfigureAwait(false);
}
catch (ObjectNotFoundException ex) when (ex.ServerMessage.Contains("Not found", StringComparison.OrdinalIgnoreCase))
{
throw new API.StorageObjectNotFoundException(ex.ServerMessage);
}
}).ConfigureAwait(false);
}

private static async Task GetObjectUsingClient(IObjectOperations client, string bucketName, string objectName, Action<Stream> callback, CancellationToken cancellationToken)
private async Task GetObjectUsingClient(IObjectOperations client, string bucketName, string objectName, Action<Stream> callback, CancellationToken cancellationToken)
{
var args = new GetObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName)
.WithCallbackStream(callback);
await client.GetObjectAsync(args, cancellationToken).ConfigureAwait(false);
await CallApi(async () =>
{
var args = new GetObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName)
.WithCallbackStream(callback);
await client.GetObjectAsync(args, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}

private async Task<IList<VirtualFileInfo>> ListObjectsUsingClient(IBucketOperations client, string bucketName, string? prefix, bool recursive, CancellationToken cancellationToken)
private Task<IList<VirtualFileInfo>> ListObjectsUsingClient(IBucketOperations client, string bucketName, string? prefix, bool recursive, CancellationToken cancellationToken)
{
return await Task.Run(() =>
var files = new List<VirtualFileInfo>();
var listArgs = new ListObjectsArgs()
.WithBucket(bucketName)
.WithPrefix(prefix)
.WithRecursive(recursive);

try
{
var files = new List<VirtualFileInfo>();
var listArgs = new ListObjectsArgs()
.WithBucket(bucketName)
.WithPrefix(prefix)
.WithRecursive(recursive);
var done = new TaskCompletionSource<IList<VirtualFileInfo>>();

var objservable = client.ListObjectsAsync(listArgs, cancellationToken);
var completedEvent = new ManualResetEventSlim(false);
Expand All @@ -341,44 +371,103 @@ private async Task<IList<VirtualFileInfo>> ListObjectsUsingClient(IBucketOperati
error =>
{
_logger.ListObjectError(bucketName, error.Message);
if (error is OperationCanceledException)
done.SetException(error);
else
done.SetException(new ListObjectException(error.ToString()));
},
() => completedEvent.Set(), cancellationToken);
() =>
{
done.SetResult(files);
if (cancellationToken.IsCancellationRequested)
{
throw new ListObjectTimeoutException("Timed out waiting for results.");
}
}, cancellationToken);

completedEvent.Wait(cancellationToken);
return files;
}).ConfigureAwait(false);
return done.Task;
}
catch (ConnectionException ex)
{
_logger.ConnectionError(ex);
var iex = new StorageConnectionException(ex.Message);
iex.Errors.Add(ex.ServerMessage);
if (ex.ServerResponse is not null && !string.IsNullOrWhiteSpace(ex.ServerResponse.ErrorMessage))
{
iex.Errors.Add(ex.ServerResponse.ErrorMessage);
}
throw iex;
}
catch (Exception ex) when (ex is not ListObjectTimeoutException && ex is not ListObjectException)
{
_logger.StorageServiceError(ex);
throw new StorageServiceException(ex.ToString());
}
}

private static async Task RemoveObjectUsingClient(IObjectOperations client, string bucketName, string objectName, CancellationToken cancellationToken)
private async Task RemoveObjectUsingClient(IObjectOperations client, string bucketName, string objectName, CancellationToken cancellationToken)
{
var args = new RemoveObjectArgs()
await CallApi(async () =>
{
var args = new RemoveObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName);
await client.RemoveObjectAsync(args, cancellationToken).ConfigureAwait(false);
await client.RemoveObjectAsync(args, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}

private static async Task PutObjectUsingClient(IObjectOperations client, string bucketName, string objectName, Stream data, long size, string contentType, Dictionary<string, string>? metadata, CancellationToken cancellationToken)
private async Task PutObjectUsingClient(IObjectOperations client, string bucketName, string objectName, Stream data, long size, string contentType, Dictionary<string, string>? metadata, CancellationToken cancellationToken)
{
var args = new PutObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName)
.WithStreamData(data)
.WithObjectSize(size)
.WithContentType(contentType);
if (metadata is not null)
await CallApi(async () =>
{
args.WithHeaders(metadata);
}
var args = new PutObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName)
.WithStreamData(data)
.WithObjectSize(size)
.WithContentType(contentType);
if (metadata is not null)
{
args.WithHeaders(metadata);
}

await client.PutObjectAsync(args, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}

await client.PutObjectAsync(args, cancellationToken).ConfigureAwait(false);
private async Task RemoveObjectsUsingClient(IObjectOperations client, string bucketName, IEnumerable<string> objectNames, CancellationToken cancellationToken)
{
await CallApi(async () =>
{
var args = new RemoveObjectsArgs()
.WithBucket(bucketName)
.WithObjects(objectNames.ToList());
await client.RemoveObjectsAsync(args, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}

private static async Task RemoveObjectsUsingClient(IObjectOperations client, string bucketName, IEnumerable<string> objectNames, CancellationToken cancellationToken)
private async Task CallApi(Func<Task> func)
{
var args = new RemoveObjectsArgs()
.WithBucket(bucketName)
.WithObjects(objectNames.ToList());
await client.RemoveObjectsAsync(args, cancellationToken).ConfigureAwait(false);
try
{
await func().ConfigureAwait(false);
}
catch (ConnectionException ex)
{
_logger.ConnectionError(ex);
var iex = new StorageConnectionException(ex.Message);
iex.Errors.Add(ex.ServerMessage);
if (ex.ServerResponse is not null && !string.IsNullOrWhiteSpace(ex.ServerResponse.ErrorMessage))
{
iex.Errors.Add(ex.ServerResponse.ErrorMessage);
}
throw iex;
}
catch (Exception ex)
{
_logger.StorageServiceError(ex);
throw new StorageServiceException(ex.ToString());
}
}

#endregion Internal Helper Methods
Expand Down
3 changes: 1 addition & 2 deletions src/Plugins/MinIO/Tests/Unit/MinIoHealthCheckTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Minio;
using Moq;
using Xunit;

Expand Down
Loading

0 comments on commit 1757165

Please sign in to comment.