From 7969bd63783525d62fc9d8c083f4f798e9bbacb3 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Tue, 22 Oct 2024 14:38:01 -0700 Subject: [PATCH 1/2] Support configuration of managed identity via Azure Functions configuration section, and replace most Azure Storage SDK V11 references with the V12 equivalents. --- .../BlobLogger.cs | 23 ++- ...ationSectionBasedConnectionNameResolver.cs | 153 +++++++++++++++ .../LoggerFactoryWrapper.cs | 2 - .../NameResolverBasedConnectionResolver.cs | 27 --- .../NetheriteProviderFactory.cs | 2 +- .../NetheriteProviderStartup.cs | 2 +- .../WebJobsConfigurationExtensions.cs | 52 +++++ .../CompatibilityConnectionResolver.cs | 1 - .../Connections/ConnectionInfo.cs | 47 ++++- .../Connections/ConnectionInfoExtensions.cs | 49 +---- ...onnectionNameToConnectionStringResolver.cs | 1 - .../Connections/CredentialShim.cs | 52 ----- .../Connections/EventHubsUtil.cs | 2 +- .../OrchestrationService/Client.cs | 1 - .../NetheriteOrchestrationService.cs | 1 - .../NetheriteOrchestrationServiceSettings.cs | 4 +- .../Scaling/AzureBlobLoadPublisher.cs | 55 +++--- .../Faster/AzureBlobs/BlobManager.cs | 6 +- .../Faster/AzureBlobs/BlobUtil.cs | 77 -------- .../Faster/AzureBlobs/StorageOperations.cs | 9 +- .../StorageLayer/Faster/CheckpointInjector.cs | 1 - .../StorageLayer/Faster/FasterKV.cs | 1 - .../Faster/FasterStorageProvider.cs | 65 +++---- .../StorageLayer/Faster/FaultInjector.cs | 6 - .../StorageLayer/Faster/PartitionStorage.cs | 1 - src/DurableTask.Netherite/Util/BlobUtils.cs | 178 ------------------ .../Util/BlobUtilsV11.cs | 38 ++++ .../Util/BlobUtilsV12.cs | 115 ++++++++++- 28 files changed, 481 insertions(+), 490 deletions(-) create mode 100644 src/DurableTask.Netherite.AzureFunctions/ConfigurationSectionBasedConnectionNameResolver.cs delete mode 100644 src/DurableTask.Netherite.AzureFunctions/NameResolverBasedConnectionResolver.cs create mode 100644 src/DurableTask.Netherite.AzureFunctions/WebJobsConfigurationExtensions.cs delete mode 100644 src/DurableTask.Netherite/Connections/CredentialShim.cs delete mode 100644 src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobUtil.cs delete mode 100644 src/DurableTask.Netherite/Util/BlobUtils.cs create mode 100644 src/DurableTask.Netherite/Util/BlobUtilsV11.cs diff --git a/src/DurableTask.Netherite.AzureFunctions/BlobLogger.cs b/src/DurableTask.Netherite.AzureFunctions/BlobLogger.cs index e07660d8..2914dc08 100644 --- a/src/DurableTask.Netherite.AzureFunctions/BlobLogger.cs +++ b/src/DurableTask.Netherite.AzureFunctions/BlobLogger.cs @@ -9,9 +9,8 @@ namespace DurableTask.Netherite.AzureFunctions using System.IO; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Storage; - using Microsoft.Azure.Storage.Blob; - using Microsoft.Extensions.Azure; + using Azure.Storage.Blobs; + using Azure.Storage.Blobs.Specialized; /// /// A simple utility class for writing text to an append blob in Azure Storage, using a periodic timer. @@ -20,7 +19,7 @@ namespace DurableTask.Netherite.AzureFunctions class BlobLogger { readonly DateTime starttime; - readonly Task blob; + readonly Task blob; readonly object flushLock = new object(); readonly object lineLock = new object(); readonly ConcurrentQueue writebackQueue; @@ -36,14 +35,14 @@ public BlobLogger(ConnectionInfo storageConnection, string hubName, string worke this.starttime = DateTime.UtcNow; this.blob = GetBlobAsync(); - async Task GetBlobAsync() + async Task GetBlobAsync() { - CloudStorageAccount storageAccount = await storageConnection.GetAzureStorageV11AccountAsync(); - CloudBlobClient client = storageAccount.CreateCloudBlobClient(); - CloudBlobContainer container = client.GetContainerReference("logs"); - container.CreateIfNotExists(); - var blob = container.GetAppendBlobReference($"{hubName}.{workerId}.{this.starttime:o}.log"); - await blob.CreateOrReplaceAsync(); + BlobServiceClient blobServiceClient = storageConnection.GetAzureStorageV12BlobServiceClient(new Azure.Storage.Blobs.BlobClientOptions()); + BlobContainerClient containerClient = blobServiceClient.GetBlobContainerClient("logs"); + await containerClient.CreateIfNotExistsAsync(); + var blob = containerClient.GetAppendBlobClient($"{hubName}.{workerId}.{this.starttime:o}.log"); + await blob.DeleteIfExistsAsync(); + await blob.CreateIfNotExistsAsync(); return blob; } @@ -95,7 +94,7 @@ public void Flush(object ignored) { // save to storage toSave.Seek(0, SeekOrigin.Begin); - this.blob.GetAwaiter().GetResult().AppendFromStream(toSave); + this.blob.GetAwaiter().GetResult().AppendBlock(toSave); toSave.Dispose(); } } diff --git a/src/DurableTask.Netherite.AzureFunctions/ConfigurationSectionBasedConnectionNameResolver.cs b/src/DurableTask.Netherite.AzureFunctions/ConfigurationSectionBasedConnectionNameResolver.cs new file mode 100644 index 00000000..3a94dc52 --- /dev/null +++ b/src/DurableTask.Netherite.AzureFunctions/ConfigurationSectionBasedConnectionNameResolver.cs @@ -0,0 +1,153 @@ +namespace DurableTask.Netherite.AzureFunctions +{ + using System; + using System.Collections.Generic; + using System.Security.Cryptography; + using System.Text; + using Azure.Identity; + using Azure.Messaging.EventHubs; + using DurableTask.Netherite; + using Microsoft.Extensions.Azure; + using Microsoft.Extensions.Configuration; + + /// + /// Resolves connections using an AzureComponentFactory and configuration sections. + /// + public class ConfigurationSectionBasedConnectionNameResolver : DurableTask.Netherite.ConnectionResolver + { + readonly AzureComponentFactory componentFactory; + readonly IConfiguration configuration; + + public ConfigurationSectionBasedConnectionNameResolver(AzureComponentFactory componentFactory, IConfiguration configuration) + { + this.componentFactory = componentFactory; + this.configuration = configuration; + } + + public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice) + { + if (TransportConnectionString.IsPseudoConnectionString(connectionName)) + { + TransportConnectionString.Parse(connectionName, out storageChoice, out transportChoice); + } + else + { + IConfigurationSection connectionSection = this.configuration.GetWebJobsConnectionStringSection(connectionName); + + if (TransportConnectionString.IsPseudoConnectionString(connectionSection.Value)) + { + TransportConnectionString.Parse(connectionSection.Value, out storageChoice, out transportChoice); + } + else + { + // the default settings are Faster and EventHubs + storageChoice = StorageChoices.Faster; + transportChoice = TransportChoices.EventHubs; + } + } + } + + public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType resourceType) + { + switch (resourceType) + { + case ResourceType.BlobStorage: + case ResourceType.TableStorage: + case ResourceType.PageBlobStorage: + return this.ResolveStorageAccountConnection(connectionName, resourceType); + + case ResourceType.EventHubsNamespace: + return this.ResolveEventHubsConnection(connectionName); + + default: + throw new NotSupportedException("unknown resource type"); + } + } + + public ConnectionInfo ResolveStorageAccountConnection(string connectionName, ResourceType resourceType) + { + IConfigurationSection connectionSection = this.configuration.GetWebJobsConnectionStringSection(connectionName); + + if (!string.IsNullOrWhiteSpace(connectionSection.Value)) + { + // It's a connection string + return ConnectionInfo.FromStorageConnectionString(connectionSection.Value, resourceType); + } + + // parse some of the relevant fields in the configuration section + StorageAccountOptions accountOptions = connectionSection.Get(); + + var tokenCredential = this.componentFactory.CreateTokenCredential(connectionSection); + + return ConnectionInfo.FromTokenCredentialAndHost(tokenCredential, accountOptions.GetHost(resourceType), resourceType); + } + + class StorageAccountOptions + { + public string AccountName { get; set; } + + public Uri BlobServiceUri { get; set; } + + public Uri TableServiceUri { get; set; } + + public string GetHost(ResourceType resourceType) + { + switch (resourceType) + { + case ResourceType.BlobStorage: + case ResourceType.PageBlobStorage: + return this.BlobServiceUri?.Host ?? $"{this.AccountName}.blob.core.windows.net"; + + case ResourceType.TableStorage: + return this.TableServiceUri?.Host ?? $"{this.AccountName}.table.core.windows.net"; + + default: + throw new NotSupportedException("unknown resource type"); + } + } + } + + public ConnectionInfo ResolveEventHubsConnection(string connectionName) + { + IConfigurationSection connectionSection = this.configuration.GetWebJobsConnectionStringSection(connectionName); + if (!connectionSection.Exists()) + { + // A common mistake is for developers to set their `connection` to a full connection string rather + // than an informational name. We handle this case specifically, to be helpful, and to avoid leaking secrets in error messages. + try + { + var properties = EventHubsConnectionStringProperties.Parse(connectionName); + + // we parsed without exception, so it's a connection string. + // We now throw a descriptive and secret-free exception. + + throw new NetheriteConfigurationException($"a full event hubs connection string was incorrectly used instead of a connection setting name"); + } + catch (FormatException) + { + } + + // Not found + throw new NetheriteConfigurationException($"EventHub account connection string with name '{connectionName}' does not exist in the settings. " + + $"Make sure that it is a defined App Setting."); + } + + if (!string.IsNullOrWhiteSpace(connectionSection.Value)) + { + // It's a connection string + return ConnectionInfo.FromEventHubsConnectionString(connectionSection.Value); + } + + var fullyQualifiedNamespace = connectionSection["fullyQualifiedNamespace"]; + if (string.IsNullOrWhiteSpace(fullyQualifiedNamespace)) + { + // We could not find the necessary parameter + throw new NetheriteConfigurationException($"Configuration for event hubs connection should have a 'fullyQualifiedNamespace' property or be a string representing a connection string."); + } + + var tokenCredential = this.componentFactory.CreateTokenCredential(connectionSection); + + return ConnectionInfo.FromTokenCredentialAndHost(tokenCredential, fullyQualifiedNamespace, ResourceType.EventHubsNamespace); + } + } +} diff --git a/src/DurableTask.Netherite.AzureFunctions/LoggerFactoryWrapper.cs b/src/DurableTask.Netherite.AzureFunctions/LoggerFactoryWrapper.cs index 61175183..6bbf0ab2 100644 --- a/src/DurableTask.Netherite.AzureFunctions/LoggerFactoryWrapper.cs +++ b/src/DurableTask.Netherite.AzureFunctions/LoggerFactoryWrapper.cs @@ -7,8 +7,6 @@ namespace DurableTask.Netherite.AzureFunctions using System; using System.IO; using System.Threading; - using Microsoft.Azure.Storage; - using Microsoft.Azure.Storage.Blob; using Microsoft.Extensions.Logging; class LoggerFactoryWrapper : ILoggerFactory diff --git a/src/DurableTask.Netherite.AzureFunctions/NameResolverBasedConnectionResolver.cs b/src/DurableTask.Netherite.AzureFunctions/NameResolverBasedConnectionResolver.cs deleted file mode 100644 index 8c286fd6..00000000 --- a/src/DurableTask.Netherite.AzureFunctions/NameResolverBasedConnectionResolver.cs +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask.Netherite.AzureFunctions -{ - using System; - using System.Collections.Generic; - using System.Text; - using Azure.Identity; - using Microsoft.Azure.WebJobs; - using Microsoft.Extensions.Logging.Abstractions; - -#if !NETCOREAPP2_2 - - /// - /// Resolves connections using a token credential and a mapping from connection names to resource names. - /// - class NameResolverBasedConnectionNameResolver : DurableTask.Netherite.ConnectionNameToConnectionStringResolver - { - public NameResolverBasedConnectionNameResolver(INameResolver nameResolver) - : base((string name) => nameResolver.Resolve(name)) - { - } - } - -#endif -} \ No newline at end of file diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs index 9a196cf9..cc371cd0 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs @@ -188,7 +188,7 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s if (!string.IsNullOrEmpty(connectionName)) { - if (this.connectionResolver is NameResolverBasedConnectionNameResolver) + if (this.connectionResolver is ConfigurationSectionBasedConnectionNameResolver) { // the application does not define a custom connection resolver. // We split the connection name into two connection names, one for storage and one for event hubs diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderStartup.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderStartup.cs index de4f5583..b30f6a72 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderStartup.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderStartup.cs @@ -19,7 +19,7 @@ public void Configure(IWebJobsBuilder builder) // We use the UnambiguousNetheriteProviderFactory class instead of the base NetheriteProviderFactory class // to avoid ambiguous constructor errors during DI. More details for this workaround can be found in the UnambiguousNetheriteProviderFactory class. builder.Services.AddSingleton(); - builder.Services.TryAddSingleton(); + builder.Services.TryAddSingleton(); #else builder.Services.AddSingleton(); #endif diff --git a/src/DurableTask.Netherite.AzureFunctions/WebJobsConfigurationExtensions.cs b/src/DurableTask.Netherite.AzureFunctions/WebJobsConfigurationExtensions.cs new file mode 100644 index 00000000..f064685a --- /dev/null +++ b/src/DurableTask.Netherite.AzureFunctions/WebJobsConfigurationExtensions.cs @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.AzureFunctions +{ + using System; + using System.Collections.Generic; + using System.Text; + using Microsoft.Extensions.Configuration; + + static class WebJobsConfigurationExtensions + { + const string WebJobsConfigurationSectionName = "AzureWebJobs"; + + public static IConfigurationSection GetWebJobsConnectionStringSection(this IConfiguration configuration, string connectionStringName) + { + // first try prefixing + string prefixedConnectionStringName = GetPrefixedConnectionStringName(connectionStringName); + IConfigurationSection section = GetConnectionStringOrSetting(configuration, prefixedConnectionStringName); + + if (!section.Exists()) + { + // next try a direct unprefixed lookup + section = GetConnectionStringOrSetting(configuration, connectionStringName); + } + + return section; + } + + public static string GetPrefixedConnectionStringName(string connectionStringName) + { + return WebJobsConfigurationSectionName + connectionStringName; + } + + /// + /// Looks for a connection string by first checking the ConfigurationStrings section, and then the root. + /// + /// The configuration. + /// The connection string key. + /// + public static IConfigurationSection GetConnectionStringOrSetting(this IConfiguration configuration, string connectionName) + { + var connectionStringSection = configuration?.GetSection("ConnectionStrings").GetSection(connectionName); + + if (connectionStringSection.Exists()) + { + return connectionStringSection; + } + return configuration?.GetSection(connectionName); + } + } +} diff --git a/src/DurableTask.Netherite/Connections/CompatibilityConnectionResolver.cs b/src/DurableTask.Netherite/Connections/CompatibilityConnectionResolver.cs index 9a4a2954..043f491b 100644 --- a/src/DurableTask.Netherite/Connections/CompatibilityConnectionResolver.cs +++ b/src/DurableTask.Netherite/Connections/CompatibilityConnectionResolver.cs @@ -7,7 +7,6 @@ namespace DurableTask.Netherite using System.Collections.Generic; using System.Data; using System.Text; - using Microsoft.Azure.Storage; using Microsoft.Extensions.Logging.Abstractions; diff --git a/src/DurableTask.Netherite/Connections/ConnectionInfo.cs b/src/DurableTask.Netherite/Connections/ConnectionInfo.cs index 00d49dd5..77b015d5 100644 --- a/src/DurableTask.Netherite/Connections/ConnectionInfo.cs +++ b/src/DurableTask.Netherite/Connections/ConnectionInfo.cs @@ -4,8 +4,9 @@ namespace DurableTask.Netherite { using System; + using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Storage; + using Azure.Core; /// /// Internal abstraction used for capturing connection information and credentials. @@ -72,10 +73,11 @@ public static ConnectionInfo FromEventHubsConnectionString(string connectionStri /// The connection info. public static ConnectionInfo FromStorageConnectionString(string connectionString, ConnectionResolver.ResourceType resourceType) { - var cloudStorageAccount = CloudStorageAccount.Parse(connectionString); + BlobUtilsV11.ParseStorageConnectionString(connectionString, out string accountName, out Uri tableEndpoint, out Uri blobEndpoint, out Uri queueEndpoint); + return new ConnectionInfo() { - ResourceName = cloudStorageAccount.Credentials.AccountName, + ResourceName = accountName, ConnectionString = connectionString, TokenCredential = null, HostName = GetEndpoint().Host, @@ -83,7 +85,7 @@ public static ConnectionInfo FromStorageConnectionString(string connectionString }; Uri GetEndpoint() => resourceType == ConnectionResolver.ResourceType.TableStorage - ? cloudStorageAccount.TableEndpoint : cloudStorageAccount.BlobEndpoint; + ? tableEndpoint : blobEndpoint; } /// @@ -140,9 +142,40 @@ public static ConnectionInfo FromTokenCredential(Azure.Core.TokenCredential toke } /// - /// When converting to a classic storage account, a renewal timer is associated with each CloudStorageAccount instance. We therefore use - /// a single instance to be shared by all. + /// Creates a connection info from a token credential and a endpoint. /// - internal Task CachedStorageAccountTask; + /// The token credential. + /// The name of the host (which must always start with the resource name). + /// The type of the resource. + /// + /// The connection info. + public static ConnectionInfo FromTokenCredentialAndHost(Azure.Core.TokenCredential tokenCredential, string host, ConnectionResolver.ResourceType resourceType) + { + return new ConnectionInfo() + { + ResourceName = host.Split('.')[0], + ConnectionString = null, + TokenCredential = tokenCredential, + HostName = host, + Scopes = resourceType == ConnectionResolver.ResourceType.EventHubsNamespace ? s_eventhubs_scopes : s_storage_scopes, + }; + } + + /// + /// Get an access token for the resource. + /// + /// + /// + public ValueTask GetTokenAsync(ConnectionResolver.ResourceType resourceType, CancellationToken cancellation) + { + if (this.TokenCredential == null) + { + throw new InvalidOperationException("missing token credential"); + } + + TokenRequestContext requestContext = new(resourceType == ConnectionResolver.ResourceType.EventHubsNamespace ? s_eventhubs_scopes : s_storage_scopes); + + return this.TokenCredential.GetTokenAsync(requestContext, cancellation); + } } } diff --git a/src/DurableTask.Netherite/Connections/ConnectionInfoExtensions.cs b/src/DurableTask.Netherite/Connections/ConnectionInfoExtensions.cs index 9500a673..48566f20 100644 --- a/src/DurableTask.Netherite/Connections/ConnectionInfoExtensions.cs +++ b/src/DurableTask.Netherite/Connections/ConnectionInfoExtensions.cs @@ -26,49 +26,6 @@ namespace DurableTask.Netherite /// static class ConnectionInfoExtensions { - /// - /// Returns a classic (v11 SDK) storage account object. - /// - /// The connection info. - /// A task for the storage account object. - /// Thrown if the host name of the connection info is not of the expected format {ResourceName}.{HostNameSuffix}. - public static Task GetAzureStorageV11AccountAsync(this ConnectionInfo connectionInfo) - { - // storage accounts run a token renewal timer, so we want to share a single instance - if (connectionInfo.CachedStorageAccountTask == null) - { - connectionInfo.CachedStorageAccountTask = GetAsync(); - } - return connectionInfo.CachedStorageAccountTask; - - async Task GetAsync() - { - if (connectionInfo.ConnectionString != null) - { - return Microsoft.Azure.Storage.CloudStorageAccount.Parse(connectionInfo.ConnectionString); - } - else - { - var credentials = new Microsoft.Azure.Storage.Auth.StorageCredentials(await connectionInfo.ToLegacyCredentialAsync(CancellationToken.None)); - - // hostnames are generally structured like - // accountname.blob.core.windows.net - // accountname.table.core.windows.net - // databasename.table.cosmos.azure.com - - int firstDot = connectionInfo.HostName.IndexOf('.'); - int secondDot = connectionInfo.HostName.IndexOf('.', firstDot + 1); - string hostNameSuffix = connectionInfo.HostName.Substring(secondDot + 1); - - return new Microsoft.Azure.Storage.CloudStorageAccount( - storageCredentials: credentials, - accountName: connectionInfo.ResourceName, - endpointSuffix: hostNameSuffix, - useHttps: true); - } - } - } - /// /// Creates an Azure Storage table client for the v12 SDK. /// @@ -154,7 +111,7 @@ public static EventProcessorHost CreateEventProcessorHost( /// The request object. /// A cancellation token. /// - public static async Task AuthorizeHttpRequestMessage(this ConnectionInfo connectionInfo, HttpRequestMessage request, CancellationToken cancellationToken) + public static async Task AuthorizeHttpRequestMessage(this ConnectionInfo connectionInfo, ConnectionResolver.ResourceType resourceType, HttpRequestMessage request, CancellationToken cancellationToken) { if (connectionInfo.ConnectionString != null) { @@ -178,8 +135,8 @@ public static async Task AuthorizeHttpRequestMessage(this ConnectionInfo connect } else { - var bearerToken = (await connectionInfo.ToLegacyCredentialAsync(cancellationToken)).Token; - request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", bearerToken); + AccessToken accessToken = await connectionInfo.GetTokenAsync(resourceType, cancellationToken); + request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", accessToken.Token); } } } diff --git a/src/DurableTask.Netherite/Connections/ConnectionNameToConnectionStringResolver.cs b/src/DurableTask.Netherite/Connections/ConnectionNameToConnectionStringResolver.cs index 6f5e0c22..bc40ae4d 100644 --- a/src/DurableTask.Netherite/Connections/ConnectionNameToConnectionStringResolver.cs +++ b/src/DurableTask.Netherite/Connections/ConnectionNameToConnectionStringResolver.cs @@ -7,7 +7,6 @@ namespace DurableTask.Netherite using System.Collections.Generic; using System.Data; using System.Text; - using Microsoft.Azure.Storage; using Microsoft.Extensions.Logging.Abstractions; /// diff --git a/src/DurableTask.Netherite/Connections/CredentialShim.cs b/src/DurableTask.Netherite/Connections/CredentialShim.cs deleted file mode 100644 index 494aa389..00000000 --- a/src/DurableTask.Netherite/Connections/CredentialShim.cs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask.Netherite -{ - using System; - using System.Collections.Generic; - using System.Text; - using Azure.Core; - using Microsoft.Azure.Storage.Auth; - using System.Threading.Tasks; - using System.Threading; - - /// - /// A utility class for constructing a - /// from a . - /// - static class CredentialShim - { - /// - /// Creates a from the connection info. - /// - /// The connection info. - /// A cancellation token. - /// - public static async Task ToLegacyCredentialAsync(this ConnectionInfo connectionInfo, CancellationToken cancellationToken) - { - AccessToken token = await GetTokenAsync(connectionInfo.TokenCredential, connectionInfo.Scopes, cancellationToken); - return new Microsoft.Azure.Storage.Auth.TokenCredential(token.Token, RenewTokenAsync, connectionInfo, NextRefresh(token)); - } - - static ValueTask GetTokenAsync( - Azure.Core.TokenCredential credential, string[] scopes, CancellationToken cancellation) - { - TokenRequestContext request = new(scopes); - return credential.GetTokenAsync(request, cancellation); - } - - static async Task RenewTokenAsync(object state, CancellationToken cancellationToken) - { - ConnectionInfo connectionInfo = (ConnectionInfo)state; - AccessToken token = await GetTokenAsync(connectionInfo.TokenCredential, connectionInfo.Scopes, cancellationToken); - return new(token.Token, NextRefresh(token)); - } - - static TimeSpan NextRefresh(AccessToken token) - { - DateTimeOffset now = DateTimeOffset.UtcNow; - return token.ExpiresOn - now - TimeSpan.FromMinutes(1); // refresh it a bit early. - } - } -} diff --git a/src/DurableTask.Netherite/Connections/EventHubsUtil.cs b/src/DurableTask.Netherite/Connections/EventHubsUtil.cs index 7786676d..d8d5b162 100644 --- a/src/DurableTask.Netherite/Connections/EventHubsUtil.cs +++ b/src/DurableTask.Netherite/Connections/EventHubsUtil.cs @@ -74,7 +74,7 @@ static async Task SendHttpRequest(ConnectionInfo info, stri request.Headers.Add("Host", info.HostName); // add an authorization header to the request - await info.AuthorizeHttpRequestMessage(request, cancellationToken); + await info.AuthorizeHttpRequestMessage(ConnectionResolver.ResourceType.EventHubsNamespace, request, cancellationToken); return await client.SendAsync(request); } diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index dbf7ccd3..1936160c 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -15,7 +15,6 @@ namespace DurableTask.Netherite using DurableTask.Core; using DurableTask.Core.Exceptions; using DurableTask.Core.History; - using Microsoft.Azure.Storage; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using static DurableTask.Netherite.TransportAbstraction; diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index d21adc53..5647c8c1 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -10,7 +10,6 @@ namespace DurableTask.Netherite using DurableTask.Netherite.Abstractions; using DurableTask.Netherite.Faster; using DurableTask.Netherite.Scaling; - using Microsoft.Azure.Storage; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index e96747d3..2775827a 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -497,8 +497,8 @@ public static void ValidateTaskhubName(string taskhubName) try { - Microsoft.Azure.Storage.NameValidator.ValidateContainerName(taskhubName.ToLowerInvariant()); - Microsoft.Azure.Storage.NameValidator.ValidateBlobName(taskhubName); + BlobUtilsV11.ValidateContainerName(taskhubName.ToLowerInvariant()); + BlobUtilsV11.ValidateBlobName(taskhubName); } catch (ArgumentException e) { diff --git a/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs b/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs index c2cf511c..7ce334b8 100644 --- a/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs +++ b/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs @@ -3,13 +3,16 @@ namespace DurableTask.Netherite.Scaling { + using Azure; + using Azure.Storage.Blobs; + using Azure.Storage.Blobs.Models; + using Azure.Storage.Blobs.Specialized; using DurableTask.Netherite.Abstractions; using DurableTask.Netherite.Faster; - using Microsoft.Azure.Storage; - using Microsoft.Azure.Storage.Blob; using Newtonsoft.Json; using System; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Text; using System.Threading; @@ -18,9 +21,10 @@ namespace DurableTask.Netherite.Scaling class AzureBlobLoadPublisher : ILoadPublisherService { readonly string taskHubName; - readonly Task blobContainer; + readonly BlobContainerClient blobContainerClient; readonly string taskhubParametersFilePath; TaskhubParameters parameters; + static readonly BlobUploadOptions BlobUploadOptions = new() { HttpHeaders = new BlobHttpHeaders() { ContentType = "application/json" } }; readonly static JsonSerializerSettings serializerSettings = new JsonSerializerSettings() { @@ -30,17 +34,16 @@ class AzureBlobLoadPublisher : ILoadPublisherService public AzureBlobLoadPublisher(ConnectionInfo connectionInfo, string taskHubName, string taskHubParametersFilePath) { - this.blobContainer = this.GetBlobContainer(connectionInfo, taskHubName); + this.blobContainerClient = this.GetBlobContainer(connectionInfo, taskHubName); this.taskHubName = taskHubName; this.taskhubParametersFilePath = taskHubParametersFilePath; } - async Task GetBlobContainer(ConnectionInfo connectionInfo, string taskHubName) + BlobContainerClient GetBlobContainer(ConnectionInfo connectionInfo, string taskHubName) { - var cloudStorageAccount = await connectionInfo.GetAzureStorageV11AccountAsync(); - CloudBlobClient serviceClient = cloudStorageAccount.CreateCloudBlobClient(); + BlobServiceClient serviceClient = connectionInfo.GetAzureStorageV12BlobServiceClient(new Azure.Storage.Blobs.BlobClientOptions()); string containerName = BlobManager.GetContainerName(taskHubName); - return serviceClient.GetContainerReference(containerName); + return serviceClient.GetBlobContainerClient(containerName); } public TimeSpan PublishInterval => TimeSpan.FromSeconds(10); @@ -55,8 +58,8 @@ async ValueTask LoadParameters(bool throwIfNotFound, CancellationToken can { if (this.parameters == null) { - this.parameters = await this.ReadJsonBlobAsync( - (await this.blobContainer).GetBlockBlobReference(this.taskhubParametersFilePath), + this.parameters = await ReadJsonBlobAsync( + this.blobContainerClient.GetBlockBlobClient(this.taskhubParametersFilePath), throwIfNotFound: throwIfNotFound, throwOnParseError: throwIfNotFound, cancellationToken).ConfigureAwait(false); @@ -64,14 +67,16 @@ async ValueTask LoadParameters(bool throwIfNotFound, CancellationToken can return this.parameters != null; } - public async Task ReadJsonBlobAsync(CloudBlockBlob blob, bool throwIfNotFound, bool throwOnParseError, CancellationToken token) where T : class + static async Task ReadJsonBlobAsync(BlockBlobClient blobClient, bool throwIfNotFound, bool throwOnParseError, CancellationToken token) where T : class { try { - var jsonText = await blob.DownloadTextAsync(token).ConfigureAwait(false); - return JsonConvert.DeserializeObject(jsonText); + var downloadResult = await blobClient.DownloadContentAsync(); + string blobContents = downloadResult.Value.Content.ToString(); + return JsonConvert.DeserializeObject(blobContents); } - catch (StorageException e) when (!throwIfNotFound && e.RequestInformation?.HttpStatusCode == 404) + catch (RequestFailedException ex) + when (BlobUtilsV12.BlobDoesNotExist(ex) && !throwIfNotFound) { // container or blob does not exist } @@ -79,10 +84,6 @@ public async Task ReadJsonBlobAsync(CloudBlockBlob blob, bool throwIfNotFo { // cannot parse content of blob } - catch(StorageException e) when (e.InnerException is OperationCanceledException operationCanceledException) - { - throw new OperationCanceledException("Blob read was canceled.", operationCanceledException); - } return null; } @@ -93,10 +94,9 @@ public async Task PublishAsync(Dictionary info, Cancell async Task UploadPartitionInfo(uint partitionId, PartitionLoadInfo loadInfo) { - var blobDirectory = (await this.blobContainer).GetDirectoryReference($"{this.parameters.TaskhubGuid}/p{partitionId:D2}"); - var blob = blobDirectory.GetBlockBlobReference("loadinfo.json"); + var blobClient = this.blobContainerClient.GetBlockBlobClient($"{this.parameters.TaskhubGuid}/p{partitionId:D2}/loadinfo.json"); var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented, serializerSettings); - await blob.UploadTextAsync(json, cancellationToken); + await blobClient.UploadAsync(new MemoryStream(Encoding.UTF8.GetBytes(json)), BlobUploadOptions, cancellationToken).ConfigureAwait(false); } List tasks = info.Select(kvp => UploadPartitionInfo(kvp.Key, kvp.Value)).ToList(); @@ -109,8 +109,8 @@ public async Task> QueryAsync(CancellationTo async Task<(uint, PartitionLoadInfo)> DownloadPartitionInfo(uint partitionId) { - PartitionLoadInfo info = await this.ReadJsonBlobAsync( - (await this.blobContainer).GetDirectoryReference($"{this.parameters.TaskhubGuid}/p{partitionId:D2}").GetBlockBlobReference("loadinfo.json"), + PartitionLoadInfo info = await ReadJsonBlobAsync( + this.blobContainerClient.GetBlockBlobClient($"{this.parameters.TaskhubGuid}/p{partitionId:D2}/loadinfo.json"), throwIfNotFound: false, throwOnParseError: true, cancellationToken).ConfigureAwait(false); @@ -126,13 +126,8 @@ public async Task DeleteIfExistsAsync(CancellationToken cancellationToken) { if (await this.LoadParameters(throwIfNotFound: false, cancellationToken).ConfigureAwait(false)) { - async Task DeletePartitionInfo(uint partitionId) - { - var blob = (await this.blobContainer).GetDirectoryReference($"{this.parameters.TaskhubGuid}/p{partitionId:D2}").GetBlockBlobReference("loadinfo.json"); - await BlobUtils.ForceDeleteAsync(blob).ConfigureAwait(false); - } - - var tasks = Enumerable.Range(0, this.parameters.PartitionCount).Select(partitionId => DeletePartitionInfo((uint)partitionId)).ToList(); + var tasks = Enumerable.Range(0, this.parameters.PartitionCount).Select(partitionId => + BlobUtilsV12.ForceDeleteAsync(this.blobContainerClient, $"{this.parameters.TaskhubGuid}/p{partitionId:D2}/loadinfo.json")).ToList(); await Task.WhenAll(tasks).ConfigureAwait(false); } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index d2871f17..a423073a 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -627,9 +627,9 @@ await this.PerformWithRetriesAsync( throw new OperationCanceledException(message, e); } catch (Exception ex) when (numAttempts < BlobManager.MaxRetries - && !this.PartitionErrorHandler.IsTerminated && BlobUtils.IsTransientStorageError(ex)) + && !this.PartitionErrorHandler.IsTerminated && BlobUtilsV12.IsTransientStorageError(ex)) { - if (BlobUtils.IsTimeout(ex)) + if (BlobUtilsV12.IsTimeout(ex)) { this.TraceHelper.FasterPerfWarning($"Lease acquisition timed out, retrying now"); } @@ -713,7 +713,7 @@ public async Task MaintenanceLoopAsync() // it's o.k. to cancel while waiting this.TraceHelper.LeaseProgress("Lease renewal loop cleanly canceled"); } - catch (Azure.RequestFailedException e) when (e.InnerException != null && e.InnerException is OperationCanceledException) + catch (Azure.RequestFailedException e) when (BlobUtilsV12.IsCancelled(e)) { // it's o.k. to cancel a lease renewal this.TraceHelper.LeaseProgress("Lease renewal storage operation canceled"); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobUtil.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobUtil.cs deleted file mode 100644 index 06cd6d24..00000000 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobUtil.cs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask.Netherite.Faster -{ - using System; - using System.Diagnostics; - using System.Linq; - using Microsoft.Azure.Storage.Blob; - - /// - /// Collection of utility functions used by classes of this package - /// - public static class BlobUtil - { - /// - /// The maximum blob size to use for new page blobs. Page Blobs permit blobs of max size 8 TB, - /// but the emulator permits only 2 GB - /// - public const long MAX_BLOB_SIZE = (long)(2 * 10e8); - - /// - /// Azure Page Blobs have a fixed sector size of 512 bytes. - /// - public const int PAGE_BLOB_SECTOR_SIZE = 512; - - const int OneShotReadMaxFileSize = 1024; - - /// - /// Create a cloud page blob with the given name in the given container - /// - /// reference to the target cloud container - /// name of the intended page blob - /// reference to a created blob - public static CloudPageBlob CreateCloudPageBlob(CloudBlobContainer container, string name) - { - CloudPageBlob blob = container.GetPageBlobReference(name); - // TODO(Tianyu): Will there ever be a race on this? - blob.Create(MAX_BLOB_SIZE); - return blob; - } - - /// - /// Read the metadata out of a blob file (stored in the system as length + bytes). It is assumed that the blob - /// exists. - /// - /// source of the metadata - /// metadata bytes - public static byte[] ReadMetadataFile(CloudPageBlob blob) - { - // Optimization for small metadata file size: we read some maximum length bytes, and if the file size is - // smaller than that, we do not have to issue a second read request after reading the length of metadata. - byte[] firstRead = new byte[OneShotReadMaxFileSize]; - var downloaded = blob.DownloadRangeToByteArray(firstRead, 0, 0, OneShotReadMaxFileSize); - Debug.Assert(downloaded == OneShotReadMaxFileSize); - - - int length = BitConverter.ToInt32(firstRead, 0); - // If file length is less than what we have read, we can return from memory immediately - if (length < OneShotReadMaxFileSize - sizeof(int)) - { - // This still copies the array. But since the cost will be dominated by read request to Azure, I am - // guessing it does not matter - return firstRead.Skip(sizeof(int)).Take(length).ToArray(); - } - - // Otherwise, copy over what we have read and read the remaining bytes. - byte[] result = new byte[length]; - int numBytesRead = OneShotReadMaxFileSize - sizeof(int); - Array.Copy(firstRead, sizeof(int), result, 0, numBytesRead); - downloaded = blob.DownloadRangeToByteArray(result, numBytesRead, OneShotReadMaxFileSize, length - numBytesRead); - Debug.Assert(downloaded == length - numBytesRead, "Underfilled read buffer"); - return result; - } - - } -} \ No newline at end of file diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs index 91367c7e..d1b78172 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs @@ -10,7 +10,6 @@ namespace DurableTask.Netherite.Faster using System.Threading; using System.Threading.Tasks; using DurableTask.Core.Common; - using Microsoft.Azure.Storage; public partial class BlobManager { @@ -94,11 +93,11 @@ public async Task PerformWithRetriesAsync( this.StorageTracer?.FasterStorageProgress(message); throw new OperationCanceledException(message, e); } - catch (Exception e) when (BlobUtils.IsTransientStorageError(e) && numAttempts < BlobManager.MaxRetries) + catch (Exception e) when (BlobUtilsV12.IsTransientStorageError(e) && numAttempts < BlobManager.MaxRetries) { stopwatch.Stop(); - if (BlobUtils.IsTimeout(e)) + if (BlobUtilsV12.IsTimeout(e)) { this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}"); } @@ -204,10 +203,10 @@ public void PerformWithRetries( this.StorageTracer?.FasterStorageProgress(message); throw new OperationCanceledException(message, e); } - catch (Exception e) when (numAttempts < BlobManager.MaxRetries && BlobUtils.IsTransientStorageError(e)) + catch (Exception e) when (numAttempts < BlobManager.MaxRetries && BlobUtilsV12.IsTransientStorageError(e)) { stopwatch.Stop(); - if (BlobUtils.IsTimeout(e)) + if (BlobUtilsV12.IsTimeout(e)) { this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}"); } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs b/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs index 87918e19..b16415e1 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs @@ -10,7 +10,6 @@ namespace DurableTask.Netherite.Faster using System.Text; using System.Threading.Tasks; using FASTER.core; - using Microsoft.Azure.Storage; /// /// Injects checkpoint and compaction decisions into the store worker. diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index a7b323c0..bdbf9380 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -20,7 +20,6 @@ namespace DurableTask.Netherite.Faster using DurableTask.Core.Common; using DurableTask.Core.Tracing; using FASTER.core; - using Microsoft.Azure.Storage.Blob.Protocol; using Newtonsoft.Json; class FasterKV : TrackedObjectStore diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs index a80d9d47..837f2223 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs @@ -6,14 +6,18 @@ namespace DurableTask.Netherite.Faster using System; using System.Collections.Generic; using System.Diagnostics; + using System.IO; using System.Linq; + using System.Text; using System.Threading; using System.Threading.Tasks; + using Azure; + using Azure.Storage.Blobs; + using Azure.Storage.Blobs.Models; + using Azure.Storage.Blobs.Specialized; using DurableTask.Netherite.Abstractions; using DurableTask.Netherite.EventHubsTransport; using DurableTask.Netherite.Scaling; - using Microsoft.Azure.Storage; - using Microsoft.Azure.Storage.Blob; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -26,8 +30,9 @@ class FasterStorageLayer : IStorageLayer readonly ILogger performanceLogger; readonly MemoryTracker memoryTracker; - readonly Task cloudBlobContainer; - readonly Task taskhubParameters; + readonly BlobServiceClient serviceClient; + readonly BlobContainerClient containerClient; + readonly BlockBlobClient taskhubParameters; public ILoadPublisherService LoadPublisher { get;} @@ -61,20 +66,10 @@ public FasterStorageLayer(NetheriteOrchestrationServiceSettings settings, Orches settings.TestHooks.CacheDebugger.MemoryTracker = this.memoryTracker; } - this.cloudBlobContainer = GetBlobContainerAsync(); - async Task GetBlobContainerAsync() - { - var blobContainerName = GetContainerName(settings.HubName); - var cloudBlobClient = (await settings.BlobStorageConnection.GetAzureStorageV11AccountAsync()).CreateCloudBlobClient(); - return cloudBlobClient.GetContainerReference(blobContainerName); - } - - this.taskhubParameters = GetTaskhubParametersAsync(); - async Task GetTaskhubParametersAsync() - { - var cloudBlobContainer = await this.cloudBlobContainer; - return cloudBlobContainer.GetBlockBlobReference(settings.TaskhubParametersFilePath); - } + this.serviceClient = settings.BlobStorageConnection.GetAzureStorageV12BlobServiceClient(new Azure.Storage.Blobs.BlobClientOptions()); + string blobContainerName = GetContainerName(settings.HubName); + this.containerClient = this.serviceClient.GetBlobContainerClient(blobContainerName); + this.taskhubParameters = this.containerClient.GetBlockBlobClient(settings.TaskhubParametersFilePath); this.traceHelper.TraceProgress("Creating LoadPublisher Service"); if (!string.IsNullOrEmpty(settings.LoadInformationAzureTableName)) @@ -99,18 +94,20 @@ void TestRuntimeAndLoading() throw new NotSupportedException("Netherite backend requires 64bit, but current process is 32bit."); } } - + async Task IStorageLayer.TryLoadTaskhubAsync(bool throwIfNotFound) { // try load the taskhub parameters try { - var blob = await this.taskhubParameters; - var jsonText = await blob.DownloadTextAsync(); - return JsonConvert.DeserializeObject(jsonText); + var downloadResult = await this.taskhubParameters.DownloadContentAsync(); + string blobContents = downloadResult.Value.Content.ToString(); + return JsonConvert.DeserializeObject(blobContents); } - catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)System.Net.HttpStatusCode.NotFound) + catch (RequestFailedException ex) + when (BlobUtilsV12.BlobOrContainerDoesNotExist(ex)) { + // container or blob does not exist if (throwIfNotFound) { throw new NetheriteConfigurationException($"The specified taskhub does not exist (TaskHub={this.settings.HubName}, StorageConnectionName={this.settings.StorageConnectionName}"); @@ -120,7 +117,7 @@ async Task IStorageLayer.TryLoadTaskhubAsync(bool throwIfNotF return null; } } - } + } async Task IStorageLayer.CreateTaskhubIfNotExistsAsync() { @@ -132,14 +129,14 @@ async Task IStorageLayer.CreateTaskhubIfNotExistsAsync() return false; } - bool containerCreated = await (await this.cloudBlobContainer).CreateIfNotExistsAsync(); - if (containerCreated) + Response response = await this.containerClient.CreateIfNotExistsAsync(); + if (response.HasValue) { - this.traceHelper.TraceProgress($"Created new blob container at {this.cloudBlobContainer.Result.Uri}"); + this.traceHelper.TraceProgress($"Created new blob container at {this.containerClient.Uri}"); } else { - this.traceHelper.TraceProgress($"Using existing blob container at {this.cloudBlobContainer.Result.Uri}"); + this.traceHelper.TraceProgress($"Using existing blob container at {this.containerClient.Uri}"); } @@ -163,8 +160,12 @@ async Task IStorageLayer.CreateTaskhubIfNotExistsAsync() Newtonsoft.Json.Formatting.Indented, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None }); - var noOverwrite = AccessCondition.GenerateIfNoneMatchCondition("*"); - await (await this.taskhubParameters).UploadTextAsync(jsonText, null, noOverwrite, null, null); + await this.taskhubParameters.UploadAsync( + content: new MemoryStream(Encoding.UTF8.GetBytes(jsonText)), + options: new BlobUploadOptions() { Conditions = new BlobRequestConditions() { IfNoneMatch = ETag.All } }, + cancellationToken: CancellationToken.None) + .ConfigureAwait(false); + this.traceHelper.TraceProgress("Created new taskhub"); // zap the partition hub so we start from zero queue positions @@ -173,7 +174,7 @@ async Task IStorageLayer.CreateTaskhubIfNotExistsAsync() await EventHubsUtil.DeleteEventHubIfExistsAsync(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, CancellationToken.None); } } - catch (StorageException e) when (BlobUtils.BlobAlreadyExists(e)) + catch (RequestFailedException e) when (BlobUtilsV12.BlobAlreadyExists(e)) { // taskhub already exists, possibly because a different node created it faster this.traceHelper.TraceProgress("Confirmed existing taskhub"); @@ -195,7 +196,7 @@ async Task IStorageLayer.DeleteTaskhubAsync() await this.LoadPublisher.DeleteIfExistsAsync(CancellationToken.None).ConfigureAwait(false); // delete the parameters file which deletes the taskhub logically - await BlobUtils.ForceDeleteAsync(await this.taskhubParameters); + await BlobUtilsV12.ForceDeleteAsync(this.containerClient, this.settings.TaskhubParametersFilePath); // delete all the files/blobs in the directory/container that represents this taskhub // If this does not complete successfully, some garbage may be left behind. diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FaultInjector.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FaultInjector.cs index c63d9b90..30efb9a8 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FaultInjector.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FaultInjector.cs @@ -11,7 +11,6 @@ namespace DurableTask.Netherite.Faster using System.Threading.Tasks; using Azure.Storage.Blobs.Specialized; using FASTER.core; - using Microsoft.Azure.Storage; /// /// Injects faults into storage accesses. @@ -111,11 +110,6 @@ public async Task WaitForStartup(int numPartitions, TimeSpan timeout) await allDone; } - public async Task BreakLease(Microsoft.Azure.Storage.Blob.CloudBlockBlob blob) - { - await blob.BreakLeaseAsync(TimeSpan.Zero); - } - internal async Task BreakLease(BlobUtilsV12.BlockBlobClients blob) { await blob.Default.GetBlobLeaseClient().BreakAsync(TimeSpan.Zero); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs index f534db80..f89a7999 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs @@ -9,7 +9,6 @@ namespace DurableTask.Netherite.Faster using System.Linq; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Storage; using Microsoft.Extensions.Logging; class PartitionStorage : IPartitionState diff --git a/src/DurableTask.Netherite/Util/BlobUtils.cs b/src/DurableTask.Netherite/Util/BlobUtils.cs deleted file mode 100644 index 0538a62f..00000000 --- a/src/DurableTask.Netherite/Util/BlobUtils.cs +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask.Netherite -{ - using System; - using System.Net.Http; - using System.Threading; - using System.Threading.Tasks; - using Azure.Storage.Blobs.Models; - using DurableTask.Core.Common; - using Microsoft.Azure.Storage; - using Microsoft.Azure.Storage.Blob; - using Microsoft.Azure.Storage.Blob.Protocol; - - static class BlobUtils - { - /// - /// Forcefully deletes a blob. - /// - /// The CloudBlob to delete. - /// A task that completes when the operation is finished. - public static async Task ForceDeleteAsync(CloudBlob blob) - { - try - { - await blob.DeleteAsync(); - return true; - } - catch (StorageException e) when (BlobDoesNotExist(e)) - { - return false; - } - catch (StorageException e) when (CannotDeleteBlobWithLease(e)) - { - try - { - await blob.BreakLeaseAsync(TimeSpan.Zero).ConfigureAwait(false); - } - catch - { - // we ignore exceptions in the lease breaking since there could be races - } - - // retry the delete - try - { - await blob.DeleteAsync().ConfigureAwait(false); - return true; - } - catch (StorageException ex) when (BlobDoesNotExist(ex)) - { - return false; - } - } - } - - /// - /// Checks whether the given storage exception is transient, and - /// therefore meaningful to retry. - /// - /// The storage exception. - /// The cancellation token that was passed to the storage request. - /// Whether this is a transient storage exception. - public static bool IsTransientStorageError(Exception exception) - { - // handle Azure V11 SDK exceptions - if (exception is StorageException e && httpStatusIndicatesTransientError(e.RequestInformation?.HttpStatusCode)) - { - return true; - } - - // handle Azure V12 SDK exceptions - if (exception is Azure.RequestFailedException e1 && httpStatusIndicatesTransientError(e1.Status)) - { - return true; - } - - // Empirically observed: timeouts on synchronous calls - if (exception.InnerException is TimeoutException) - { - return true; - } - - // Empirically observed: transient cancellation exceptions that are not application initiated - if (exception is OperationCanceledException || exception.InnerException is OperationCanceledException) - { - return true; - } - - // Empirically observed: transient exception ('An existing connection was forcibly closed by the remote host') - if (exception.InnerException is System.Net.Http.HttpRequestException && exception.InnerException?.InnerException is System.IO.IOException) - { - return true; - } - - // Empirically observed: transient socket exceptions - if (exception is System.IO.IOException && exception.InnerException is System.Net.Sockets.SocketException) - { - return true; - } - - // Empirically observed: transient DNS failures - if (exception is Azure.RequestFailedException && exception.InnerException is System.Net.Http.HttpRequestException e2 && e2.Message.Contains("No such host is known")) - { - return true; - } - - // Empirically observed: socket exceptions under heavy stress, such as - // - "Only one usage of each socket address (protocol/network address/port) is normally permitted" - // - "An operation on a socket could not be performed because the system lacked sufficient buffer space or because a queue was full" - if (exception is Azure.RequestFailedException - && (exception.InnerException is System.Net.Http.HttpRequestException e3) - && e3.InnerException is System.Net.Sockets.SocketException) - { - return true; - } - - return false; - } - - /// - /// Checks whether the given exception is a timeout exception. - /// - /// The exception. - /// Whether this is a timeout storage exception. - public static bool IsTimeout(Exception exception) - { - return exception is System.TimeoutException - || (exception is StorageException e && e.RequestInformation?.HttpStatusCode == 408) //408 Request Timeout - || (exception is Azure.RequestFailedException e1 && (e1.Status == 408 || e1.ErrorCode == "OperationTimedOut")) - || (exception is TaskCanceledException & exception.Message.StartsWith("The operation was cancelled because it exceeded the configured timeout")); - } - - // Transient http status codes as documented at https://docs.microsoft.com/en-us/azure/architecture/best-practices/retry-service-specific#azure-storage - static bool httpStatusIndicatesTransientError(int? statusCode) => - (statusCode == 408 //408 Request Timeout - || statusCode == 429 //429 Too Many Requests - || statusCode == 500 //500 Internal Server Error - || statusCode == 502 //502 Bad Gateway - || statusCode == 503 //503 Service Unavailable - || statusCode == 504); //504 Gateway Timeout - - - // Lease error codes are documented at https://docs.microsoft.com/en-us/rest/api/storageservices/lease-blob - - public static bool LeaseConflictOrExpired(StorageException e) - { - return (e.RequestInformation?.HttpStatusCode == 409) || (e.RequestInformation?.HttpStatusCode == 412); - } - - public static bool LeaseConflict(StorageException e) - { - return (e.RequestInformation?.HttpStatusCode == 409); - } - - public static bool LeaseExpired(StorageException e) - { - return (e.RequestInformation?.HttpStatusCode == 412); - } - - public static bool CannotDeleteBlobWithLease(StorageException e) - { - return (e.RequestInformation?.HttpStatusCode == 412); - } - - public static bool BlobDoesNotExist(StorageException e) - { - var information = e.RequestInformation?.ExtendedErrorInformation; - return (e.RequestInformation?.HttpStatusCode == 404) && (information.ErrorCode.Equals(BlobErrorCodeStrings.BlobNotFound)); - } - - public static bool BlobAlreadyExists(StorageException e) - { - return (e.RequestInformation?.HttpStatusCode == 409); - } - } -} diff --git a/src/DurableTask.Netherite/Util/BlobUtilsV11.cs b/src/DurableTask.Netherite/Util/BlobUtilsV11.cs new file mode 100644 index 00000000..a8a984aa --- /dev/null +++ b/src/DurableTask.Netherite/Util/BlobUtilsV11.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite +{ + using System; + using System.Net.Http; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.Core.Common; + + /// + /// We still keep a few utility functions around from the Microsoft.Azure.Storage (V11) SDK. There are no V12 SDK equivalents. + /// We should consider replacing these with some alternative next time we do a major version bump, and remove the V11 dependency. + /// + static class BlobUtilsV11 + { + public static void ParseStorageConnectionString(string connectionString, out string accountName, out Uri tableEndpoint, out Uri blobEndpoint, out Uri queueEndpoint) + { + var cloudStorageAccount = Microsoft.Azure.Storage.CloudStorageAccount.Parse(connectionString); + + accountName = cloudStorageAccount.Credentials.AccountName; + tableEndpoint = cloudStorageAccount.TableEndpoint; + blobEndpoint = cloudStorageAccount.BlobEndpoint; + queueEndpoint = cloudStorageAccount.QueueEndpoint; + } + + public static void ValidateContainerName(string name) + { + Microsoft.Azure.Storage.NameValidator.ValidateContainerName(name.ToLowerInvariant()); + } + + public static void ValidateBlobName(string name) + { + Microsoft.Azure.Storage.NameValidator.ValidateBlobName(name); + } + } +} diff --git a/src/DurableTask.Netherite/Util/BlobUtilsV12.cs b/src/DurableTask.Netherite/Util/BlobUtilsV12.cs index 4248abb6..21c63a08 100644 --- a/src/DurableTask.Netherite/Util/BlobUtilsV12.cs +++ b/src/DurableTask.Netherite/Util/BlobUtilsV12.cs @@ -10,15 +10,19 @@ namespace DurableTask.Netherite using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; + using Azure; using Azure.Core; using Azure.Core.Pipeline; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using Azure.Storage.Blobs.Specialized; + using Newtonsoft.Json; using static DurableTask.Netherite.TransportAbstraction; static class BlobUtilsV12 { + static readonly BlobUploadOptions JsonBlobUploadOptions = new() { HttpHeaders = new BlobHttpHeaders() { ContentType = "application/json" } }; + public class ServerTimeoutPolicy : HttpPipelineSynchronousPolicy { readonly int timeout; @@ -120,6 +124,27 @@ internal static PageBlobClients GetPageBlobClients(ContainerClients containerCli } + public static async Task ReadJsonBlobAsync(BlockBlobClient blobClient, bool throwIfNotFound, bool throwOnParseError, CancellationToken token) where T : class + { + try + { + var downloadResult = await blobClient.DownloadContentAsync(); + string blobContents = downloadResult.Value.Content.ToString(); + return JsonConvert.DeserializeObject(blobContents); + } + catch (RequestFailedException ex) + when (BlobUtilsV12.BlobDoesNotExist(ex) && !throwIfNotFound) + { + // container or blob does not exist + } + catch (JsonException) when (!throwOnParseError) + { + // cannot parse content of blob + } + + return null; + } + public struct BlobDirectory { readonly ContainerClients client; @@ -207,7 +232,85 @@ public static async Task ForceDeleteAsync(BlobContainerClient containerCli } } } - + + /// + /// Checks whether the given exception is a timeout exception. + /// + /// The exception. + /// Whether this is a timeout storage exception. + public static bool IsTimeout(Exception exception) + { + return exception is System.TimeoutException + || (exception is Azure.RequestFailedException e1 && (e1.Status == 408 || e1.ErrorCode == "OperationTimedOut")) + || (exception is TaskCanceledException & exception.Message.StartsWith("The operation was cancelled because it exceeded the configured timeout")); + } + + /// + /// Checks whether the given storage exception is transient, and + /// therefore meaningful to retry. + /// + /// The storage exception. + /// The cancellation token that was passed to the storage request. + /// Whether this is a transient storage exception. + public static bool IsTransientStorageError(Exception exception) + { + if (exception is Azure.RequestFailedException e1 && httpStatusIndicatesTransientError(e1.Status)) + { + return true; + } + + // Empirically observed: timeouts on synchronous calls + if (exception.InnerException is TimeoutException) + { + return true; + } + + // Empirically observed: transient cancellation exceptions that are not application initiated + if (exception is OperationCanceledException || exception.InnerException is OperationCanceledException) + { + return true; + } + + // Empirically observed: transient exception ('An existing connection was forcibly closed by the remote host') + if (exception.InnerException is System.Net.Http.HttpRequestException && exception.InnerException?.InnerException is System.IO.IOException) + { + return true; + } + + // Empirically observed: transient socket exceptions + if (exception is System.IO.IOException && exception.InnerException is System.Net.Sockets.SocketException) + { + return true; + } + + // Empirically observed: transient DNS failures + if (exception is Azure.RequestFailedException && exception.InnerException is System.Net.Http.HttpRequestException e2 && e2.Message.Contains("No such host is known")) + { + return true; + } + + // Empirically observed: socket exceptions under heavy stress, such as + // - "Only one usage of each socket address (protocol/network address/port) is normally permitted" + // - "An operation on a socket could not be performed because the system lacked sufficient buffer space or because a queue was full" + if (exception is Azure.RequestFailedException + && (exception.InnerException is System.Net.Http.HttpRequestException e3) + && e3.InnerException is System.Net.Sockets.SocketException) + { + return true; + } + + return false; + } + + static bool httpStatusIndicatesTransientError(int? statusCode) => + (statusCode == 408 //408 Request Timeout + || statusCode == 429 //429 Too Many Requests + || statusCode == 500 //500 Internal Server Error + || statusCode == 502 //502 Bad Gateway + || statusCode == 503 //503 Service Unavailable + || statusCode == 504); //504 Gateway Timeout + + // Lease error codes are documented at https://docs.microsoft.com/en-us/rest/api/storageservices/lease-blob public static bool LeaseConflictOrExpired(Azure.RequestFailedException e) @@ -240,9 +343,19 @@ public static bool BlobDoesNotExist(Azure.RequestFailedException e) return e.Status == 404 && e.ErrorCode == BlobErrorCode.BlobNotFound; } + public static bool BlobOrContainerDoesNotExist(Azure.RequestFailedException e) + { + return e.Status == 404 && (e.ErrorCode == BlobErrorCode.BlobNotFound || e.ErrorCode == BlobErrorCode.ContainerNotFound); + } + public static bool BlobAlreadyExists(Azure.RequestFailedException e) { return e.Status == 409; } + + public static bool IsCancelled(Azure.RequestFailedException e) + { + return e.InnerException != null && e.InnerException is OperationCanceledException; + } } } From c66e8349ad8b8e7333a5f8f4f5e2ec05a3951623 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Fri, 10 Jan 2025 13:05:00 -0800 Subject: [PATCH 2/2] remove dependency on Microsoft.Azure.Storage.Blob --- .../Connections/ConnectionInfo.cs | 3 +- .../DurableTask.Netherite.csproj | 1 - .../NetheriteOrchestrationServiceSettings.cs | 4 +- .../Scaling/AzureBlobLoadPublisher.cs | 4 +- .../Faster/FasterStorageProvider.cs | 2 +- .../Util/BlobUtilsV11.cs | 38 -------- .../Util/ConnectionStringParser.cs | 92 +++++++++++++++++++ .../Util/NameValidator.cs | 92 +++++++++++++++++++ .../LoadPublisherTests.cs | 13 ++- .../Benchmarks/FileHash/FileOrchestration.cs | 2 - .../Benchmarks/FileHash/HashActivity.cs | 2 - .../Benchmarks/WordCount/HttpTriggers.cs | 1 - 12 files changed, 198 insertions(+), 56 deletions(-) delete mode 100644 src/DurableTask.Netherite/Util/BlobUtilsV11.cs create mode 100644 src/DurableTask.Netherite/Util/ConnectionStringParser.cs create mode 100644 src/DurableTask.Netherite/Util/NameValidator.cs diff --git a/src/DurableTask.Netherite/Connections/ConnectionInfo.cs b/src/DurableTask.Netherite/Connections/ConnectionInfo.cs index 77b015d5..358deb08 100644 --- a/src/DurableTask.Netherite/Connections/ConnectionInfo.cs +++ b/src/DurableTask.Netherite/Connections/ConnectionInfo.cs @@ -7,6 +7,7 @@ namespace DurableTask.Netherite using System.Threading; using System.Threading.Tasks; using Azure.Core; + using DurableTask.Netherite.Util; /// /// Internal abstraction used for capturing connection information and credentials. @@ -73,7 +74,7 @@ public static ConnectionInfo FromEventHubsConnectionString(string connectionStri /// The connection info. public static ConnectionInfo FromStorageConnectionString(string connectionString, ConnectionResolver.ResourceType resourceType) { - BlobUtilsV11.ParseStorageConnectionString(connectionString, out string accountName, out Uri tableEndpoint, out Uri blobEndpoint, out Uri queueEndpoint); + ConnectionStringParser.ParseStorageConnectionString(connectionString, out string accountName, out Uri tableEndpoint, out Uri blobEndpoint); return new ConnectionInfo() { diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index b450c670..bc6aec92 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -53,7 +53,6 @@ - diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 5daa3b36..8b15a170 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -502,8 +502,8 @@ public static void ValidateTaskhubName(string taskhubName) try { - BlobUtilsV11.ValidateContainerName(taskhubName.ToLowerInvariant()); - BlobUtilsV11.ValidateBlobName(taskhubName); + Util.NameValidator.ValidateContainerName(taskhubName.ToLowerInvariant()); + Util.NameValidator.ValidateBlobName(taskhubName); } catch (ArgumentException e) { diff --git a/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs b/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs index 7ce334b8..41f5fcea 100644 --- a/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs +++ b/src/DurableTask.Netherite/Scaling/AzureBlobLoadPublisher.cs @@ -56,6 +56,8 @@ public Task CreateIfNotExistsAsync(CancellationToken cancellationToken) async ValueTask LoadParameters(bool throwIfNotFound, CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + if (this.parameters == null) { this.parameters = await ReadJsonBlobAsync( @@ -76,7 +78,7 @@ static async Task ReadJsonBlobAsync(BlockBlobClient blobClient, bool throw return JsonConvert.DeserializeObject(blobContents); } catch (RequestFailedException ex) - when (BlobUtilsV12.BlobDoesNotExist(ex) && !throwIfNotFound) + when (BlobUtilsV12.BlobOrContainerDoesNotExist(ex) && !throwIfNotFound) { // container or blob does not exist } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs index 837f2223..964f750d 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs @@ -130,7 +130,7 @@ async Task IStorageLayer.CreateTaskhubIfNotExistsAsync() } Response response = await this.containerClient.CreateIfNotExistsAsync(); - if (response.HasValue) + if (response != null) { this.traceHelper.TraceProgress($"Created new blob container at {this.containerClient.Uri}"); } diff --git a/src/DurableTask.Netherite/Util/BlobUtilsV11.cs b/src/DurableTask.Netherite/Util/BlobUtilsV11.cs deleted file mode 100644 index a8a984aa..00000000 --- a/src/DurableTask.Netherite/Util/BlobUtilsV11.cs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace DurableTask.Netherite -{ - using System; - using System.Net.Http; - using System.Threading; - using System.Threading.Tasks; - using DurableTask.Core.Common; - - /// - /// We still keep a few utility functions around from the Microsoft.Azure.Storage (V11) SDK. There are no V12 SDK equivalents. - /// We should consider replacing these with some alternative next time we do a major version bump, and remove the V11 dependency. - /// - static class BlobUtilsV11 - { - public static void ParseStorageConnectionString(string connectionString, out string accountName, out Uri tableEndpoint, out Uri blobEndpoint, out Uri queueEndpoint) - { - var cloudStorageAccount = Microsoft.Azure.Storage.CloudStorageAccount.Parse(connectionString); - - accountName = cloudStorageAccount.Credentials.AccountName; - tableEndpoint = cloudStorageAccount.TableEndpoint; - blobEndpoint = cloudStorageAccount.BlobEndpoint; - queueEndpoint = cloudStorageAccount.QueueEndpoint; - } - - public static void ValidateContainerName(string name) - { - Microsoft.Azure.Storage.NameValidator.ValidateContainerName(name.ToLowerInvariant()); - } - - public static void ValidateBlobName(string name) - { - Microsoft.Azure.Storage.NameValidator.ValidateBlobName(name); - } - } -} diff --git a/src/DurableTask.Netherite/Util/ConnectionStringParser.cs b/src/DurableTask.Netherite/Util/ConnectionStringParser.cs new file mode 100644 index 00000000..6cd77163 --- /dev/null +++ b/src/DurableTask.Netherite/Util/ConnectionStringParser.cs @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.Util +{ + using System; + using System.Collections.Generic; + using System.Globalization; + using System.Text.RegularExpressions; + + // The code in this class is copied from the previous Azure Storage client SDKs, + // which, unlike the newer SDKs, supported connection string parsing. + static class ConnectionStringParser + { + public static void ParseStorageConnectionString(string connectionString, out string accountName, out Uri tableEndpoint, out Uri blobEndpoint) + { + if (string.IsNullOrEmpty(connectionString)) + { + throw new ArgumentNullException("connectionString"); + } + + // parse the string into individual property settings + + IDictionary connectionStringParameters = new Dictionary(); + string[] propertySettings = connectionString.Split(new char[1] { ';' }, StringSplitOptions.RemoveEmptyEntries); + for (int i = 0; i < propertySettings.Length; i++) + { + string[] leftAndRightHandSide = propertySettings[i].Split(new char[1] { '=' }, 2); + if (leftAndRightHandSide.Length != 2) + { + throw new FormatException("Settings must be of the form \"name=value\"."); + } + + if (connectionStringParameters.ContainsKey(leftAndRightHandSide[0])) + { + throw new FormatException(string.Format(CultureInfo.InvariantCulture, "Duplicate setting '{0}' found.", leftAndRightHandSide[0])); + } + + connectionStringParameters.Add(leftAndRightHandSide[0], leftAndRightHandSide[1]); + } + + if (connectionStringParameters.TryGetValue("UseDevelopmentStorage", out string useDevStorageString) + && bool.TryParse(useDevStorageString, out bool useDevStorage) + && useDevStorage) + { + accountName = "devstoreaccount1"; + + UriBuilder uriBuilder; + + if (connectionStringParameters.TryGetValue("DevelopmentStorageProxyUri", out string proxyUriString)) + { + Uri proxyUri = new Uri(proxyUriString); + uriBuilder = new UriBuilder(proxyUri.Scheme, proxyUri.Host); + } + else + { + uriBuilder = new UriBuilder("http", "127.0.0.1"); + } + + uriBuilder.Path = accountName; + uriBuilder.Port = 10000; + blobEndpoint = uriBuilder.Uri; + uriBuilder.Port = 10002; + tableEndpoint = uriBuilder.Uri; + } + else + { + // determine the account name + + if (!connectionStringParameters.TryGetValue("AccountName", out accountName)) + { + throw new FormatException("missing connection string parameter: AccountName"); + } + + // construct the service endpoints + + if (!connectionStringParameters.TryGetValue("DefaultEndpointsProtocol", out string scheme)) + { + throw new FormatException("missing connection string parameter: DefaultEndpointsProtocol"); + } + + if (!connectionStringParameters.TryGetValue("EndpointSuffix", out string endpointSuffix)) + { + endpointSuffix = "core.windows.net"; + } + + blobEndpoint = new Uri(string.Format(CultureInfo.InvariantCulture, "{0}://{1}.{2}.{3}/", scheme, accountName, "blob", endpointSuffix)); + tableEndpoint = new Uri(string.Format(CultureInfo.InvariantCulture, "{0}://{1}.{2}.{3}/", scheme, accountName, "table", endpointSuffix)); + } + } + } +} diff --git a/src/DurableTask.Netherite/Util/NameValidator.cs b/src/DurableTask.Netherite/Util/NameValidator.cs new file mode 100644 index 00000000..b38884d4 --- /dev/null +++ b/src/DurableTask.Netherite/Util/NameValidator.cs @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.Util +{ + using System; + using System.Globalization; + using System.Text.RegularExpressions; + + // This class is copied from the previous Azure Storage client SDKs + // The following logic may require updating over time as the Azure Storage team discourages client-side validation + // that may grow stale as the server evolves. See here: https://github.com/Azure/azure-sdk-for-js/issues/13519#issuecomment-822420305 + static class NameValidator + { + static readonly RegexOptions RegexOptions = RegexOptions.ExplicitCapture | RegexOptions.Singleline | RegexOptions.CultureInvariant; + + static readonly Regex MetricsTableRegex = new Regex("^\\$Metrics(HourPrimary|MinutePrimary|HourSecondary|MinuteSecondary)?(Transactions)(Blob|Queue|Table)$", RegexOptions); + static readonly Regex ShareContainerQueueRegex = new Regex("^[a-z0-9]+(-[a-z0-9]+)*$", RegexOptions); + static readonly Regex TableRegex = new Regex("^[A-Za-z][A-Za-z0-9]*$", RegexOptions); + + public static void ValidateBlobName(string blobName) + { + if (string.IsNullOrWhiteSpace(blobName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. The {0} name may not be null, empty, or whitespace only.", "blob")); + } + + if (blobName.Length < 1 || blobName.Length > 1024) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name length. The {0} name must be between {1} and {2} characters long.", "blob", 1, 1024)); + } + + int num = 0; + for (int i = 0; i < blobName.Length; i++) + { + if (blobName[i] == '/') + { + num++; + } + } + + if (num >= 254) + { + throw new ArgumentException("The count of URL path segments (strings between '/' characters) as part of the blob name cannot exceed 254."); + } + } + + public static void ValidateContainerName(string containerName) + { + if (!"$root".Equals(containerName, StringComparison.Ordinal) && !"$logs".Equals(containerName, StringComparison.Ordinal)) + { + ValidateShareContainerQueueHelper(containerName, "container"); + } + } + + public static void ValidateTableName(string tableName) + { + if (string.IsNullOrWhiteSpace(tableName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. The {0} name may not be null, empty, or whitespace only.", "table")); + } + + if (tableName.Length < 3 || tableName.Length > 63) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name length. The {0} name must be between {1} and {2} characters long.", "table", 3, 63)); + } + + if (!TableRegex.IsMatch(tableName) && !MetricsTableRegex.IsMatch(tableName) && !tableName.Equals("$MetricsCapacityBlob", StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. Check MSDN for more information about valid {0} naming.", "table")); + } + } + + static void ValidateShareContainerQueueHelper(string resourceName, string resourceType) + { + if (string.IsNullOrWhiteSpace(resourceName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. The {0} name may not be null, empty, or whitespace only.", resourceType)); + } + + if (resourceName.Length < 3 || resourceName.Length > 63) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name length. The {0} name must be between {1} and {2} characters long.", resourceType, 3, 63)); + } + + if (!ShareContainerQueueRegex.IsMatch(resourceName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. Check MSDN for more information about valid {0} naming.", resourceType)); + } + } + } +} diff --git a/test/DurableTask.Netherite.Tests/LoadPublisherTests.cs b/test/DurableTask.Netherite.Tests/LoadPublisherTests.cs index bbee2e99..2c81dde7 100644 --- a/test/DurableTask.Netherite.Tests/LoadPublisherTests.cs +++ b/test/DurableTask.Netherite.Tests/LoadPublisherTests.cs @@ -14,7 +14,6 @@ namespace DurableTask.Netherite.Tests using DurableTask.Core; using DurableTask.Netherite.Faster; using DurableTask.Netherite.Scaling; - using Microsoft.Azure.Storage.Blob; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using Newtonsoft.Json.Serialization; @@ -47,11 +46,11 @@ async Task NothingLeftBehind() { // there should not be anything left inside the blob container var blobContainerName = taskHub.ToLowerInvariant() + "-storage"; - var cloudBlobClient = Microsoft.Azure.Storage.CloudStorageAccount.Parse(Environment.GetEnvironmentVariable(TestConstants.StorageConnectionName)).CreateCloudBlobClient(); - var cloudBlobContainer = cloudBlobClient.GetContainerReference(blobContainerName); - if (await cloudBlobContainer.ExistsAsync()) + var blobServiceClient = new Azure.Storage.Blobs.BlobServiceClient(Environment.GetEnvironmentVariable(TestConstants.StorageConnectionName)); + var blobContainerClient = blobServiceClient.GetBlobContainerClient(blobContainerName); + if (await blobContainerClient.ExistsAsync()) { - var allBlobs = cloudBlobContainer.ListBlobs().ToList(); + var allBlobs = await blobContainerClient.GetBlobsAsync().ToListAsync(); Assert.Empty(allBlobs); } } @@ -243,7 +242,7 @@ public async Task PublishToMissingTable() public async Task PublishToMissingTaskHub() { IStorageLayer taskhub = this.GetFreshTaskHub(null); - await Assert.ThrowsAnyAsync(() => taskhub.LoadPublisher.PublishAsync(new Dictionary() { { 1, this.Create("Y") } }, CancellationToken.None)); + await Assert.ThrowsAnyAsync(() => taskhub.LoadPublisher.PublishAsync(new Dictionary() { { 1, this.Create("Y") } }, CancellationToken.None)); } [Fact] @@ -257,7 +256,7 @@ public async Task QueryFromMissingTable() public async Task QueryFromMissingTaskHub() { IStorageLayer taskhub = this.GetFreshTaskHub(null); - await Assert.ThrowsAnyAsync(() => taskhub.LoadPublisher.QueryAsync(CancellationToken.None)); + await Assert.ThrowsAnyAsync(() => taskhub.LoadPublisher.QueryAsync(CancellationToken.None)); } } } diff --git a/test/PerformanceTests/Benchmarks/FileHash/FileOrchestration.cs b/test/PerformanceTests/Benchmarks/FileHash/FileOrchestration.cs index acb8ad42..4f1fd081 100644 --- a/test/PerformanceTests/Benchmarks/FileHash/FileOrchestration.cs +++ b/test/PerformanceTests/Benchmarks/FileHash/FileOrchestration.cs @@ -8,8 +8,6 @@ namespace PerformanceTests.FileHash using Microsoft.Extensions.Logging; using System.Collections.Generic; using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Microsoft.Azure.Storage; - using Microsoft.Azure.Storage.Blob; using System.Linq; /// diff --git a/test/PerformanceTests/Benchmarks/FileHash/HashActivity.cs b/test/PerformanceTests/Benchmarks/FileHash/HashActivity.cs index 84d37df8..800fbdda 100644 --- a/test/PerformanceTests/Benchmarks/FileHash/HashActivity.cs +++ b/test/PerformanceTests/Benchmarks/FileHash/HashActivity.cs @@ -7,8 +7,6 @@ namespace PerformanceTests.FileHash using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Microsoft.Azure.Storage; - using Microsoft.Azure.Storage.Blob; using System.Linq; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; diff --git a/test/PerformanceTests/Benchmarks/WordCount/HttpTriggers.cs b/test/PerformanceTests/Benchmarks/WordCount/HttpTriggers.cs index ed675787..570ceaca 100644 --- a/test/PerformanceTests/Benchmarks/WordCount/HttpTriggers.cs +++ b/test/PerformanceTests/Benchmarks/WordCount/HttpTriggers.cs @@ -12,7 +12,6 @@ namespace PerformanceTests.WordCount using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Microsoft.Azure.WebJobs.Extensions.DurableTask; - using Microsoft.Azure.Storage; using System.Collections.Generic; using System.Text; using DurableTask.Netherite.Faster;