diff --git a/src/AWS/Orleans.Streaming.SQS/Hosting/ClientBuilderExtensions.cs b/src/AWS/Orleans.Streaming.SQS/Hosting/ClientBuilderExtensions.cs
new file mode 100644
index 00000000000..f6724eb735b
--- /dev/null
+++ b/src/AWS/Orleans.Streaming.SQS/Hosting/ClientBuilderExtensions.cs
@@ -0,0 +1,46 @@
+
+using System;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.Configuration;
+using Orleans.Hosting;
+using OrleansAWSUtils.Streams;
+
+namespace Orleans.Hosting
+{
+ public static class ClientBuilderExtensions
+ {
+ ///
+ /// Configure cluster client to use SQS persistent streams.
+ ///
+ public static IClientBuilder AddSqsStreams(this IClientBuilder builder, string name, Action configureOptions)
+ {
+ return builder.ConfigureServices(services => services.AddClusterClientSqsStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use SQS persistent streams.
+ ///
+ public static IClientBuilder AddSqsStreams(this IClientBuilder builder, string name, Action> configureOptions = null)
+ {
+ return builder.ConfigureServices(services => services.AddClusterClientSqsStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use SQS persistent streams.
+ ///
+ public static IServiceCollection AddClusterClientSqsStreams(this IServiceCollection services, string name, Action configureOptions)
+ {
+ return services.AddClusterClientSqsStreams(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use SQS persistent streams.
+ ///
+ public static IServiceCollection AddClusterClientSqsStreams(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ {
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddClusterClientPersistentStreams(name, SQSAdapterFactory.Create, configureOptions);
+ }
+ }
+}
diff --git a/src/AWS/Orleans.Streaming.SQS/Hosting/SiloBuilderExtensions.cs b/src/AWS/Orleans.Streaming.SQS/Hosting/SiloBuilderExtensions.cs
new file mode 100644
index 00000000000..4a89d775072
--- /dev/null
+++ b/src/AWS/Orleans.Streaming.SQS/Hosting/SiloBuilderExtensions.cs
@@ -0,0 +1,45 @@
+using System;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.Configuration;
+using Orleans.Hosting;
+using OrleansAWSUtils.Streams;
+
+namespace Orleans.Hosting
+{
+ public static class SiloBuilderExtensions
+ {
+ ///
+ /// Configure silo to use SQS persistent streams.
+ ///
+ public static ISiloHostBuilder AddSqsStreams(this ISiloHostBuilder builder, string name, Action configureOptions)
+ {
+ return builder.ConfigureServices(services => services.AddSiloSqsStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use SQS persistent streams.
+ ///
+ public static ISiloHostBuilder AddSqsStreams(this ISiloHostBuilder builder, string name, Action> configureOptions = null)
+ {
+ return builder.ConfigureServices(services => services.AddSiloSqsStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use SQS persistent streams.
+ ///
+ public static IServiceCollection AddSiloSqsStreams(this IServiceCollection services, string name, Action configureOptions)
+ {
+ return services.AddSiloSqsStreams(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure silo to use SQS persistent streams.
+ ///
+ public static IServiceCollection AddSiloSqsStreams(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ {
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddSiloPersistentStreams(name, SQSAdapterFactory.Create, configureOptions);
+ }
+ }
+}
diff --git a/src/AWS/Orleans.Streaming.SQS/Orleans.Streaming.SQS.csproj b/src/AWS/Orleans.Streaming.SQS/Orleans.Streaming.SQS.csproj
index 5ecbfa40234..d0cbae7f06f 100644
--- a/src/AWS/Orleans.Streaming.SQS/Orleans.Streaming.SQS.csproj
+++ b/src/AWS/Orleans.Streaming.SQS/Orleans.Streaming.SQS.csproj
@@ -22,6 +22,7 @@
+
diff --git a/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterFactory.cs b/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterFactory.cs
index 9732ea9d132..254d2da8ab4 100644
--- a/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterFactory.cs
+++ b/src/AWS/Orleans.Streaming.SQS/Streams/SQSAdapterFactory.cs
@@ -1,60 +1,46 @@
-using Orleans.Providers;
-using Orleans.Providers.Streams.Common;
-using Orleans.Runtime;
-using Orleans.Streams;
-using System;
+using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Orleans.Providers.Streams.Common;
+using Orleans.Streams;
using Orleans.Serialization;
+using Orleans.Configuration;
namespace OrleansAWSUtils.Streams
{
/// Factory class for Azure Queue based stream provider.
public class SQSAdapterFactory : IQueueAdapterFactory
{
- internal const int CacheSizeDefaultValue = 4096;
- internal const string NumQueuesPropertyName = "NumQueues";
-
- /// Default number of Azure Queue used in this stream provider.
- public const int NumQueuesDefaultValue = 8; // keep as power of 2.
-
- private string deploymentId;
- private string dataConnectionString;
- private string providerName;
- private int cacheSize;
- private int numQueues;
+ private readonly string providerName;
+ private readonly SqsStreamOptions options;
+ private readonly SiloOptions siloOptions;
+ private readonly SerializationManager serializationManager;
+ private readonly ILoggerFactory loggerFactory;
private HashRingBasedStreamQueueMapper streamQueueMapper;
private IQueueAdapterCache adapterCache;
- private SerializationManager serializationManager;
- private ILoggerFactory loggerFactory;
- /// "DataConnectionString".
- public const string DataConnectionStringPropertyName = "DataConnectionString";
- /// "DeploymentId".
- public const string DeploymentIdPropertyName = "DeploymentId";
///
/// Application level failure handler override.
///
protected Func> StreamFailureHandlerFactory { private get; set; }
- /// Init the factory.
- public virtual void Init(IProviderConfiguration config, string providerName, IServiceProvider serviceProvider)
+ public SQSAdapterFactory(string name, SqsStreamOptions options, IServiceProvider serviceProvider, IOptions siloOptions, SerializationManager serializationManager, ILoggerFactory loggerFactory)
{
- if (config == null) throw new ArgumentNullException("config");
- if (!config.Properties.TryGetValue(DataConnectionStringPropertyName, out dataConnectionString))
- throw new ArgumentException(string.Format("{0} property not set", DataConnectionStringPropertyName));
- if (!config.Properties.TryGetValue(DeploymentIdPropertyName, out deploymentId))
- throw new ArgumentException(string.Format("{0} property not set", DeploymentIdPropertyName));
+ this.providerName = name;
+ this.options = options;
+ this.siloOptions = siloOptions.Value;
+ this.serializationManager = serializationManager;
+ this.loggerFactory = loggerFactory;
+ }
- cacheSize = SimpleQueueAdapterCache.ParseSize(config, CacheSizeDefaultValue);
- this.loggerFactory = serviceProvider.GetRequiredService();
- numQueues = config.GetIntProperty(NumQueuesPropertyName, NumQueuesDefaultValue);
- this.providerName = providerName;
- streamQueueMapper = new HashRingBasedStreamQueueMapper(numQueues, providerName);
- adapterCache = new SimpleQueueAdapterCache(cacheSize, providerName, serviceProvider.GetRequiredService());
- this.serializationManager = serviceProvider.GetRequiredService();
+ /// Init the factory.
+ public virtual void Init()
+ {
+ streamQueueMapper = new HashRingBasedStreamQueueMapper(this.options.NumQueues, this.providerName);
+ adapterCache = new SimpleQueueAdapterCache(this.options.CacheSize, this.providerName, this.loggerFactory);
if (StreamFailureHandlerFactory == null)
{
StreamFailureHandlerFactory =
@@ -65,7 +51,7 @@ public virtual void Init(IProviderConfiguration config, string providerName, ISe
/// Creates the Azure Queue based adapter.
public virtual Task CreateAdapter()
{
- var adapter = new SQSAdapter(this.serializationManager, streamQueueMapper, this.loggerFactory, dataConnectionString, deploymentId, providerName);
+ var adapter = new SQSAdapter(this.serializationManager, this.streamQueueMapper, this.loggerFactory, this.options.ConnectionString, this.options.ClusterId ?? this.siloOptions.ClusterId, this.providerName);
return Task.FromResult(adapter);
}
@@ -90,5 +76,13 @@ public Task GetDeliveryFailureHandler(QueueId queueId)
{
return StreamFailureHandlerFactory(queueId);
}
+
+ public static SQSAdapterFactory Create(IServiceProvider services, string name)
+ {
+ IOptionsSnapshot streamOptionsSnapshot = services.GetRequiredService>();
+ var factory = ActivatorUtilities.CreateInstance(services, name, streamOptionsSnapshot.Get(name));
+ factory.Init();
+ return factory;
+ }
}
}
diff --git a/src/AWS/Orleans.Streaming.SQS/Streams/SQSStreamProvider.cs b/src/AWS/Orleans.Streaming.SQS/Streams/SQSStreamProvider.cs
deleted file mode 100644
index 2d3679a8c14..00000000000
--- a/src/AWS/Orleans.Streaming.SQS/Streams/SQSStreamProvider.cs
+++ /dev/null
@@ -1,12 +0,0 @@
-using Orleans.Providers.Streams.Common;
-using OrleansAWSUtils.Streams;
-
-namespace Orleans.Providers.Streams
-{
- ///
- /// Persistent stream provider that uses azure queue for persistence
- ///
- public class SQSStreamProvider : PersistentStreamProvider
- {
- }
-}
diff --git a/src/AWS/Orleans.Streaming.SQS/Streams/SQSStreamProviderUtils.cs b/src/AWS/Orleans.Streaming.SQS/Streams/SQSStreamProviderUtils.cs
index ae537a58700..e52402e45b4 100644
--- a/src/AWS/Orleans.Streaming.SQS/Streams/SQSStreamProviderUtils.cs
+++ b/src/AWS/Orleans.Streaming.SQS/Streams/SQSStreamProviderUtils.cs
@@ -1,9 +1,10 @@
-using Orleans.Streams;
-using OrleansAWSUtils.Storage;
-using System.Collections.Generic;
+using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
+using Orleans.Streams;
+using OrleansAWSUtils.Storage;
+using Orleans.Configuration;
namespace OrleansAWSUtils.Streams
{
@@ -20,7 +21,7 @@ public static async Task DeleteAllUsedQueues(string providerName, string cluster
{
if (clusterId != null)
{
- var queueMapper = new HashRingBasedStreamQueueMapper(SQSAdapterFactory.NumQueuesDefaultValue, providerName);
+ var queueMapper = new HashRingBasedStreamQueueMapper(SqsStreamOptions.NumQueuesDefaultValue, providerName);
List allQueues = queueMapper.GetAllQueues().ToList();
var deleteTasks = new List();
diff --git a/src/AWS/Orleans.Streaming.SQS/Streams/SqsStreamOptions.cs b/src/AWS/Orleans.Streaming.SQS/Streams/SqsStreamOptions.cs
new file mode 100644
index 00000000000..a9649d41384
--- /dev/null
+++ b/src/AWS/Orleans.Streaming.SQS/Streams/SqsStreamOptions.cs
@@ -0,0 +1,17 @@
+
+namespace Orleans.Configuration
+{
+ public class SqsStreamOptions : PersistentStreamOptions
+ {
+ public string ClusterId { get; set; }
+
+ [Redact]
+ public string ConnectionString { get; set; }
+
+ public int CacheSize { get; set; } = CacheSizeDefaultValue;
+ public const int CacheSizeDefaultValue = 4096;
+
+ public int NumQueues { get; set; } = NumQueuesDefaultValue;
+ public const int NumQueuesDefaultValue = 8; // keep as power of 2.
+ }
+}
diff --git a/src/AdoNet/Orleans.Persistence.AdoNet/Storage/Provider/SiloBuilderExtensions.cs b/src/AdoNet/Orleans.Persistence.AdoNet/Storage/Provider/SiloBuilderExtensions.cs
index dfa1b41fc74..71f41ff06b5 100644
--- a/src/AdoNet/Orleans.Persistence.AdoNet/Storage/Provider/SiloBuilderExtensions.cs
+++ b/src/AdoNet/Orleans.Persistence.AdoNet/Storage/Provider/SiloBuilderExtensions.cs
@@ -1,6 +1,4 @@
using System;
-using System.Collections.Generic;
-using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
diff --git a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs
index 716de35f2ba..fea85203fd3 100644
--- a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs
+++ b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorage.cs
@@ -1,5 +1,4 @@
using System;
-using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
@@ -10,8 +9,7 @@
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
-using Orleans.Hosting;
-using Orleans.Providers;
+using Orleans.Configuration;
using Orleans.Providers.Azure;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
diff --git a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs
index 528b4a7257a..705a6c72367 100644
--- a/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs
+++ b/src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureBlobStorageOptions.cs
@@ -1,13 +1,13 @@
-using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using Microsoft.Extensions.Options;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
using Orleans.Persistence.AzureStorage;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
-using System;
-using System.Collections.Generic;
-namespace Orleans.Hosting
+namespace Orleans.Configuration
{
public class AzureBlobStorageOptions
{
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Hosting/ClientBuilderExtensions.cs b/src/Azure/Orleans.Streaming.AzureStorage/Hosting/ClientBuilderExtensions.cs
new file mode 100644
index 00000000000..d900c88217f
--- /dev/null
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Hosting/ClientBuilderExtensions.cs
@@ -0,0 +1,49 @@
+
+using System;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.Configuration;
+using Orleans.Providers.Streams.AzureQueue;
+
+namespace Orleans.Hosting
+{
+ public static class ClientBuilderExtensions
+ {
+ ///
+ /// Configure cluster client to use azure queue persistent streams.
+ ///
+ public static IClientBuilder AddAzureQueueStreams(this IClientBuilder builder, string name, Action configureOptions)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return builder.ConfigureServices(services => services.AddClusterClientAzureQueueStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use azure queue persistent streams.
+ ///
+ public static IClientBuilder AddAzureQueueStreams(this IClientBuilder builder, string name, Action> configureOptions = null)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return builder.ConfigureServices(services => services.AddClusterClientAzureQueueStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use azure queue persistent streams.
+ ///
+ public static IServiceCollection AddClusterClientAzureQueueStreams(this IServiceCollection services, string name, Action configureOptions)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return services.AddClusterClientAzureQueueStreams(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use azure queue persistent streams.
+ ///
+ public static IServiceCollection AddClusterClientAzureQueueStreams(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddClusterClientPersistentStreams(name, AzureQueueAdapterFactory.Create, configureOptions);
+ }
+ }
+}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Hosting/SiloBuilderExtensions.cs b/src/Azure/Orleans.Streaming.AzureStorage/Hosting/SiloBuilderExtensions.cs
new file mode 100644
index 00000000000..107c61bf52b
--- /dev/null
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Hosting/SiloBuilderExtensions.cs
@@ -0,0 +1,49 @@
+using System;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.Configuration;
+using Orleans.Hosting;
+using Orleans.Providers.Streams.AzureQueue;
+
+namespace Orleans.Hosting
+{
+ public static class SiloBuilderExtensions
+ {
+ ///
+ /// Configure silo to use azure queue persistent streams.
+ ///
+ public static ISiloHostBuilder AddAzureQueueStreams(this ISiloHostBuilder builder, string name, Action configureOptions)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return builder.ConfigureServices(services => services.AddSiloAzureQueueStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use azure queue persistent streams.
+ ///
+ public static ISiloHostBuilder AddAzureQueueStreams(this ISiloHostBuilder builder, string name, Action> configureOptions = null)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return builder.ConfigureServices(services => services.AddSiloAzureQueueStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use azure queue persistent streams.
+ ///
+ public static IServiceCollection AddSiloAzureQueueStreams(this IServiceCollection services, string name, Action configureOptions)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return services.AddSiloAzureQueueStreams(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure silo to use azure queue persistent streams.
+ ///
+ public static IServiceCollection AddSiloAzureQueueStreams(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ where TDataAdapter : IAzureQueueDataAdapter
+ {
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddSiloPersistentStreams(name, AzureQueueAdapterFactory.Create, configureOptions);
+ }
+ }
+}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj b/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj
index f2d57ea24c4..d57c2332f20 100644
--- a/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj
@@ -26,7 +26,7 @@
-
+
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/AzureConfigurationExtensions.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/AzureConfigurationExtensions.cs
deleted file mode 100644
index 3a9b15058a8..00000000000
--- a/src/Azure/Orleans.Streaming.AzureStorage/Providers/AzureConfigurationExtensions.cs
+++ /dev/null
@@ -1,173 +0,0 @@
-using System;
-using System.Collections.Generic;
-using Orleans.Providers.Streams.AzureQueue;
-using Orleans.Providers.Streams.Common;
-using Orleans.Streams;
-
-namespace Orleans.Runtime.Configuration
-{
- ///
- /// Extension methods for configuration classes specific to Orleans.Streaming.AzureStorage.dll
- ///
- public static class AzureConfigurationExtensions
- {
- ///
- /// Adds a stream provider of type .
- ///
- /// The cluster configuration object to add provider to.
- /// The provider name
- /// The azure storage connection string. If none is provided, it will use the same as in the Globals configuration.
- /// The number of queues to use as partitions.
- /// The ClusterId used for partitioning. If none is specified, the provider will use the same ClusterId as the Cluster.
- /// The cache size.
- /// The startup state of the persistent stream provider.
- /// Settings related to all persistent stream providers.
- public static void AddAzureQueueStreamProvider(
- this ClusterConfiguration config,
- string providerName,
- string connectionString = null,
- int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
- string clusterId = null,
- int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
-#pragma warning disable CS0618 // Type or member is obsolete
- PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
-#pragma warning restore CS0618 // Type or member is obsolete
- PersistentStreamProviderConfig persistentStreamProviderConfig = null)
- {
- connectionString = GetConnectionString(connectionString, config);
- clusterId = clusterId ?? config.Globals.ClusterId;
- var properties = GetAzureQueueStreamProviderProperties(providerName, connectionString, numberOfQueues, clusterId, cacheSize, startupState, persistentStreamProviderConfig);
-#pragma warning disable 618
- config.Globals.RegisterStreamProvider(providerName, properties);
-#pragma warning restore 618
- }
-
- ///
- /// Adds a stream provider of type .
- ///
- /// The cluster configuration object to add provider to.
- /// The provider name
- /// The azure storage connection string. If none is provided, it will use the same as in the Globals configuration.
- /// The number of queues to use as partitions.
- /// The ClusterId used for partitioning. If none is specified, the provider will use the same ClusterId as the Cluster.
- /// The cache size.
- /// The startup state of the persistent stream provider.
- /// Settings related to all persistent stream providers.
- public static void AddAzureQueueStreamProviderV2(
- this ClusterConfiguration config,
- string providerName,
- string connectionString = null,
- int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
- string clusterId = null,
- int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
-#pragma warning disable 618
- PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
-#pragma warning restore 618
- PersistentStreamProviderConfig persistentStreamProviderConfig = null)
- {
- connectionString = GetConnectionString(connectionString, config);
- clusterId = clusterId ?? config.Globals.ClusterId;
- var properties = GetAzureQueueStreamProviderProperties(providerName, connectionString, numberOfQueues, clusterId, cacheSize, startupState, persistentStreamProviderConfig);
- config.Globals.RegisterStreamProvider(providerName, properties);
- }
-
- ///
- /// Adds a stream provider of type .
- ///
- /// The cluster configuration object to add provider to.
- /// The provider name
- /// The azure storage connection string. If none is provided, it will use the same as in the Globals configuration.
- /// The number of queues to use as partitions.
- /// The ClusterId used for partitioning. If none is specified, the provider will use the same ClusterId as the Cluster.
- /// The cache size.
- /// The startup state of the persistent stream provider.
- /// Settings related to all persistent stream providers.
- public static void AddAzureQueueStreamProvider(
- this ClientConfiguration config,
- string providerName,
- string connectionString = null,
- int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
- string clusterId = null,
- int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
-#pragma warning disable 618
- PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
-#pragma warning restore 618
- PersistentStreamProviderConfig persistentStreamProviderConfig = null)
- {
- connectionString = GetConnectionString(connectionString, config);
- clusterId = clusterId ?? config.ClusterId;
- var properties = GetAzureQueueStreamProviderProperties(providerName, connectionString, numberOfQueues, clusterId, cacheSize, startupState, persistentStreamProviderConfig);
-#pragma warning disable 618
- config.RegisterStreamProvider(providerName, properties);
-#pragma warning restore 618
- }
-
- ///
- /// Adds a stream provider of type .
- ///
- /// The cluster configuration object to add provider to.
- /// The provider name
- /// The azure storage connection string. If none is provided, it will use the same as in the Globals configuration.
- /// The number of queues to use as partitions.
- /// The ClusterId used for partitioning. If none is specified, the provider will use the same ClusterId as the Cluster.
- /// The cache size.
- /// The startup state of the persistent stream provider.
- /// Settings related to all persistent stream providers.
- public static void AddAzureQueueStreamProviderV2(
- this ClientConfiguration config,
- string providerName,
- string connectionString = null,
- int numberOfQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue,
- string clusterId = null,
- int cacheSize = AzureQueueAdapterConstants.CacheSizeDefaultValue,
-#pragma warning disable 618
- PersistentStreamProviderState startupState = AzureQueueStreamProvider.StartupStateDefaultValue,
-#pragma warning restore 618
- PersistentStreamProviderConfig persistentStreamProviderConfig = null)
- {
- connectionString = GetConnectionString(connectionString, config);
- clusterId = clusterId ?? config.ClusterId;
- var properties = GetAzureQueueStreamProviderProperties(providerName, connectionString, numberOfQueues, clusterId, cacheSize, startupState, persistentStreamProviderConfig);
- config.RegisterStreamProvider(providerName, properties);
- }
-
- private static Dictionary GetAzureQueueStreamProviderProperties(string providerName, string connectionString, int numberOfQueues, string deploymentId, int cacheSize, PersistentStreamProviderState startupState, PersistentStreamProviderConfig persistentStreamProviderConfig)
- {
- if (string.IsNullOrWhiteSpace(providerName)) throw new ArgumentNullException(nameof(providerName));
- if (numberOfQueues < 1) throw new ArgumentOutOfRangeException(nameof(numberOfQueues));
-
- var properties = new Dictionary
- {
- { AzureQueueAdapterConstants.DataConnectionStringPropertyName, connectionString },
- { AzureQueueAdapterConstants.NumQueuesPropertyName, numberOfQueues.ToString() },
- { AzureQueueAdapterConstants.DeploymentIdPropertyName, deploymentId },
- { SimpleQueueAdapterCache.CacheSizePropertyName, cacheSize.ToString() },
-#pragma warning disable 618
- { AzureQueueStreamProvider.StartupStatePropertyName, startupState.ToString() },
-#pragma warning restore 618
- };
-
- persistentStreamProviderConfig?.WriteProperties(properties);
-
- return properties;
- }
-
- private static string GetConnectionString(string connectionString, ClusterConfiguration config)
- {
- if (!string.IsNullOrWhiteSpace(connectionString)) return connectionString;
- if (!string.IsNullOrWhiteSpace(config.Globals.DataConnectionString)) return config.Globals.DataConnectionString;
-
- throw new ArgumentNullException(nameof(connectionString),
- "Parameter value and fallback value are both null or empty.");
- }
-
- private static string GetConnectionString(string connectionString, ClientConfiguration config)
- {
- if (!string.IsNullOrWhiteSpace(connectionString)) return connectionString;
- if (!string.IsNullOrWhiteSpace(config.DataConnectionString)) return config.DataConnectionString;
-
- throw new ArgumentNullException(nameof(connectionString),
- "Parameter value and fallback value are both null or empty.");
- }
- }
-}
\ No newline at end of file
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueAdapterConstants.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueAdapterConstants.cs
deleted file mode 100644
index 786ff099ccb..00000000000
--- a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueAdapterConstants.cs
+++ /dev/null
@@ -1,22 +0,0 @@
-namespace Orleans.Providers.Streams.AzureQueue
-{
- ///
- /// Azure queue stream provider constants.
- ///
- public static class AzureQueueAdapterConstants
- {
- internal const int CacheSizeDefaultValue = 4096;
-
- /// "DataConnectionString".
- public const string DataConnectionStringPropertyName = "DataConnectionString";
- /// "DeploymentId".
- public const string DeploymentIdPropertyName = "DeploymentId";
- /// "MessageVisibilityTimeout".
- public const string MessageVisibilityTimeoutPropertyName = "VisibilityTimeout";
-
- /// "NumQueues".
- public const string NumQueuesPropertyName = "NumQueues";
- /// Default number of Azure Queue used in this stream provider.
- public const int NumQueuesDefaultValue = 8; // keep as power of 2.
- }
-}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueAdapterFactory.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueAdapterFactory.cs
index a43e920193a..0c89e300047 100644
--- a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueAdapterFactory.cs
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueAdapterFactory.cs
@@ -2,9 +2,11 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Orleans.Serialization;
using Orleans.Streams;
using Orleans.Providers.Streams.Common;
+using Orleans.Configuration;
namespace Orleans.Providers.Streams.AzureQueue
{
@@ -12,78 +14,47 @@ namespace Orleans.Providers.Streams.AzureQueue
public class AzureQueueAdapterFactory : IQueueAdapterFactory
where TDataAdapter : IAzureQueueDataAdapter
{
- private string deploymentId;
- private string dataConnectionString;
- private string providerName;
- private int cacheSize;
- private int numQueues;
- private TimeSpan? messageVisibilityTimeout;
+ private readonly string providerName;
+ private readonly AzureQueueStreamOptions options;
+ private readonly SiloOptions siloOptions;
+ private readonly ILoggerFactory loggerFactory;
+ private readonly Func dataAadaptorFactory;
private HashRingBasedStreamQueueMapper streamQueueMapper;
private IQueueAdapterCache adapterCache;
- private Func adaptorFactory;
- private ILoggerFactory loggerFactory;
///
/// Gets the serialization manager.
///
- public SerializationManager SerializationManager { get; private set; }
+
+ protected SerializationManager SerializationManager { get; }
///
/// Application level failure handler override.
///
protected Func> StreamFailureHandlerFactory { private get; set; }
- /// Init the factory.
- public virtual void Init(IProviderConfiguration config, string providerName, IServiceProvider serviceProvider)
+ public AzureQueueAdapterFactory(string name, AzureQueueStreamOptions options, IServiceProvider serviceProvider, IOptions siloOptions, SerializationManager serializationManager, ILoggerFactory loggerFactory)
{
- if (config == null) throw new ArgumentNullException(nameof(config));
- if (!config.Properties.TryGetValue(AzureQueueAdapterConstants.DataConnectionStringPropertyName, out dataConnectionString))
- throw new ArgumentException($"{AzureQueueAdapterConstants.DataConnectionStringPropertyName} property not set");
- if (!config.Properties.TryGetValue(AzureQueueAdapterConstants.DeploymentIdPropertyName, out deploymentId))
- throw new ArgumentException($"{AzureQueueAdapterConstants.DeploymentIdPropertyName} property not set");
- string messageVisibilityTimeoutRaw;
- if (config.Properties.TryGetValue(AzureQueueAdapterConstants.MessageVisibilityTimeoutPropertyName, out messageVisibilityTimeoutRaw))
- {
- TimeSpan messageVisibilityTimeoutTemp;
- if (!TimeSpan.TryParse(messageVisibilityTimeoutRaw, out messageVisibilityTimeoutTemp))
- {
- throw new ArgumentException(
- $"Failed to parse {AzureQueueAdapterConstants.MessageVisibilityTimeoutPropertyName} value '{messageVisibilityTimeoutRaw}' as a TimeSpan");
- }
-
- messageVisibilityTimeout = messageVisibilityTimeoutTemp;
- }
- else
- {
- messageVisibilityTimeout = null;
- }
-
- cacheSize = SimpleQueueAdapterCache.ParseSize(config, AzureQueueAdapterConstants.CacheSizeDefaultValue);
- this.loggerFactory = serviceProvider.GetRequiredService();
- string numQueuesString;
- numQueues = AzureQueueAdapterConstants.NumQueuesDefaultValue;
- if (config.Properties.TryGetValue(AzureQueueAdapterConstants.NumQueuesPropertyName, out numQueuesString))
- {
- if (!int.TryParse(numQueuesString, out numQueues))
- throw new ArgumentException($"{AzureQueueAdapterConstants.NumQueuesPropertyName} invalid. Must be int");
- }
-
- this.providerName = providerName;
- streamQueueMapper = new HashRingBasedStreamQueueMapper(numQueues, providerName);
- adapterCache = new SimpleQueueAdapterCache(cacheSize, providerName, loggerFactory);
- if (StreamFailureHandlerFactory == null)
- {
- StreamFailureHandlerFactory =
- qid => Task.FromResult(new NoOpStreamDeliveryFailureHandler());
- }
+ this.providerName = name;
+ this.options = options ?? throw new ArgumentNullException(nameof(options));
+ this.siloOptions = siloOptions.Value;
+ this.SerializationManager = serializationManager ?? throw new ArgumentNullException(nameof(serializationManager));
+ this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
+ this.dataAadaptorFactory = () => ActivatorUtilities.GetServiceOrCreateInstance(serviceProvider);
+ }
- this.SerializationManager = serviceProvider.GetRequiredService();
- this.adaptorFactory = () => ActivatorUtilities.GetServiceOrCreateInstance(serviceProvider);
+ /// Init the factory.
+ public virtual void Init()
+ {
+ this.streamQueueMapper = new HashRingBasedStreamQueueMapper(this.options.NumQueues, providerName);
+ this.adapterCache = new SimpleQueueAdapterCache(this.options.CacheSize, this.providerName, this.loggerFactory);
+ this.StreamFailureHandlerFactory = this.StreamFailureHandlerFactory ??
+ ((qid) => Task.FromResult(new NoOpStreamDeliveryFailureHandler()));
}
/// Creates the Azure Queue based adapter.
public virtual Task CreateAdapter()
{
- var adapter = new AzureQueueAdapter(this.adaptorFactory(), this.SerializationManager, streamQueueMapper, this.loggerFactory, dataConnectionString, deploymentId, providerName, messageVisibilityTimeout);
+ var adapter = new AzureQueueAdapter(this.dataAadaptorFactory(), this.SerializationManager, this.streamQueueMapper, this.loggerFactory, this.options.ConnectionString, this.options.ClusterId ?? this.siloOptions.ClusterId, this.providerName, this.options.MessageVisibilityTimeout);
return Task.FromResult(adapter);
}
@@ -108,5 +79,13 @@ public Task GetDeliveryFailureHandler(QueueId queueId)
{
return StreamFailureHandlerFactory(queueId);
}
+
+ public static AzureQueueAdapterFactory Create(IServiceProvider services, string name)
+ {
+ IOptionsSnapshot streamOptionsSnapshot = services.GetRequiredService>();
+ var factory = ActivatorUtilities.CreateInstance>(services, name, streamOptionsSnapshot.Get(name));
+ factory.Init();
+ return factory;
+ }
}
}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamOptions.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamOptions.cs
new file mode 100644
index 00000000000..891b256ee06
--- /dev/null
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamOptions.cs
@@ -0,0 +1,24 @@
+
+using System;
+
+namespace Orleans.Configuration
+{
+ ///
+ /// Azure queue stream provider options.
+ ///
+ public class AzureQueueStreamOptions : PersistentStreamOptions
+ {
+ [RedactConnectionString]
+ public string ConnectionString { get; set; }
+
+ public string ClusterId { get; set; }
+
+ public TimeSpan? MessageVisibilityTimeout { get; set; }
+
+ public int CacheSize { get; set; } = DEFAULT_CACHE_SIZE;
+ public const int DEFAULT_CACHE_SIZE = 4096;
+
+ public int NumQueues { get; set; } = DEFAULT_NUM_QUEUES;
+ public const int DEFAULT_NUM_QUEUES = 8; // keep as power of 2.
+ }
+}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamProvider.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamProvider.cs
deleted file mode 100644
index 4a9e0b1e33a..00000000000
--- a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamProvider.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-using System;
-using Orleans.Providers.Streams.Common;
-
-namespace Orleans.Providers.Streams.AzureQueue
-{
- ///
- /// Persistent stream provider that uses azure queue for persistence
- /// WARNING: This version is maintained for compatability purposes. New services should use AzureQueueStreamProviderV2 as it supports external serializers.
- ///
- [Obsolete("This version is maintained for compatability purposes. New services should use AzureQueueStreamProviderV2 as it supports external serializers.")]
- public class AzureQueueStreamProvider : PersistentStreamProvider>
- {
- }
-
- ///
- /// Persistent stream provider that uses azure queue for persistence
- ///
- public class AzureQueueStreamProviderV2 : PersistentStreamProvider>
- {
- }
-}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamProviderUtils.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamProviderUtils.cs
index 9a80bdfcb46..cf848a953b9 100644
--- a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamProviderUtils.cs
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/AzureQueueStreamProviderUtils.cs
@@ -3,6 +3,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.AzureUtils;
+using Orleans.Configuration;
using Orleans.Streams;
namespace Orleans.Providers.Streams.AzureQueue
@@ -23,7 +24,8 @@ public static async Task DeleteAllUsedAzureQueues(ILoggerFactory loggerFactory,
{
if (deploymentId != null)
{
- var queueMapper = new HashRingBasedStreamQueueMapper(AzureQueueAdapterConstants.NumQueuesDefaultValue, providerName);
+ // TODO: Do not assume defaults !? - jbragg
+ var queueMapper = new HashRingBasedStreamQueueMapper(AzureQueueStreamOptions.DEFAULT_NUM_QUEUES, providerName);
List allQueues = queueMapper.GetAllQueues().ToList();
var deleteTasks = new List();
@@ -48,7 +50,8 @@ public static async Task ClearAllUsedAzureQueues(ILoggerFactory loggerFactory, s
{
if (deploymentId != null)
{
- var queueMapper = new HashRingBasedStreamQueueMapper(AzureQueueAdapterConstants.NumQueuesDefaultValue, providerName);
+ // TODO: Do not assume defaults !? - jbragg
+ var queueMapper = new HashRingBasedStreamQueueMapper(AzureQueueStreamOptions.DEFAULT_NUM_QUEUES, providerName);
List allQueues = queueMapper.GetAllQueues().ToList();
var deleteTasks = new List();
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueAdapterFactory.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueAdapterFactory.cs
index 8c939402202..8ffbf7cb299 100644
--- a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueAdapterFactory.cs
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueAdapterFactory.cs
@@ -1,7 +1,7 @@
using System;
using System.Threading.Tasks;
-using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Streams;
@@ -10,30 +10,21 @@ namespace Orleans.Providers.Streams.AzureQueue
/// Factory class for Simple Azure Queue based stream provider.
public class SimpleAzureQueueAdapterFactory : IQueueAdapterFactory
{
- private string dataConnectionString;
- private string queueName;
+ private readonly SimpleAzureQueueStreamOptions options;
private string providerName;
private ILoggerFactory loggerFactory;
- /// "QueueName".
- public const string QUEUE_NAME_STRING = "QueueName";
- /// Init the factory.
- public virtual void Init(IProviderConfiguration config, string providerName, IServiceProvider serviceProvider)
+ public SimpleAzureQueueAdapterFactory(string name, SimpleAzureQueueStreamOptions options, IServiceProvider serviceProvider, ILoggerFactory loggerFactory)
{
- if (config == null) throw new ArgumentNullException("config");
- if (!config.Properties.TryGetValue(AzureQueueAdapterConstants.DataConnectionStringPropertyName, out dataConnectionString))
- throw new ArgumentException(String.Format("{0} property not set", AzureQueueAdapterConstants.DataConnectionStringPropertyName));
- if (!config.Properties.TryGetValue(QUEUE_NAME_STRING, out queueName))
- throw new ArgumentException(String.Format("{0} property not set", QUEUE_NAME_STRING));
- this.loggerFactory = serviceProvider.GetRequiredService();
- this.providerName = providerName;
+ this.providerName = name;
+ this.options = options ?? throw new ArgumentNullException(nameof(options));
+ this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
}
-
/// Creates the Simple Azure Queue based adapter.
public virtual Task CreateAdapter()
{
- var adapter = new SimpleAzureQueueAdapter(this.loggerFactory, dataConnectionString, providerName, queueName);
+ var adapter = new SimpleAzureQueueAdapter(this.loggerFactory, this.options.ConnectionString, this.providerName, this.options.QueueName);
return Task.FromResult(adapter);
}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueStreamOptions.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueStreamOptions.cs
new file mode 100644
index 00000000000..4783a86cc37
--- /dev/null
+++ b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueStreamOptions.cs
@@ -0,0 +1,14 @@
+
+namespace Orleans.Configuration
+{
+ ///
+ /// Simple Azure queue stream provider options.
+ ///
+ public class SimpleAzureQueueStreamOptions : PersistentStreamOptions
+ {
+ [RedactConnectionString]
+ public string ConnectionString { get; set; }
+
+ public string QueueName { get; set; }
+ }
+}
diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueStreamProvider.cs b/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueStreamProvider.cs
deleted file mode 100644
index 52f5ec804dd..00000000000
--- a/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/AzureQueue/SimpleAzureQueueStreamProvider.cs
+++ /dev/null
@@ -1,11 +0,0 @@
-using Orleans.Providers.Streams.Common;
-
-namespace Orleans.Providers.Streams.AzureQueue
-{
- ///
- /// Persistent stream provider that uses azure queue for persistence
- ///
- public class SimpleAzureQueueStreamProvider : PersistentStreamProvider
- {
- }
-}
diff --git a/src/Azure/Orleans.Streaming.EventHubs/Hosting/ClientBuilderExtensions.cs b/src/Azure/Orleans.Streaming.EventHubs/Hosting/ClientBuilderExtensions.cs
new file mode 100644
index 00000000000..a68973ed031
--- /dev/null
+++ b/src/Azure/Orleans.Streaming.EventHubs/Hosting/ClientBuilderExtensions.cs
@@ -0,0 +1,46 @@
+
+using System;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.Configuration;
+using Orleans.Hosting;
+using Orleans.ServiceBus.Providers;
+
+namespace Orleans.Hosting
+{
+ public static class ClientBuilderExtensions
+ {
+ ///
+ /// Configure cluster client to use event hub persistent streams.
+ ///
+ public static IClientBuilder AddEventHubStreams(this IClientBuilder builder, string name, Action configureOptions)
+ {
+ return builder.ConfigureServices(services => services.AddClusterClientEventHubStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use event hub persistent streams.
+ ///
+ public static IClientBuilder AddEventHubStreams(this IClientBuilder builder, string name, Action> configureOptions = null)
+ {
+ return builder.ConfigureServices(services => services.AddClusterClientEventHubStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use event hub persistent streams.
+ ///
+ public static IServiceCollection AddClusterClientEventHubStreams(this IServiceCollection services, string name, Action configureOptions)
+ {
+ return services.AddClusterClientEventHubStreams(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure cluster client to use event hub persistent streams.
+ ///
+ public static IServiceCollection AddClusterClientEventHubStreams(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ {
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddClusterClientPersistentStreams(name, EventHubAdapterFactory.Create, configureOptions);
+ }
+ }
+}
diff --git a/src/Azure/Orleans.Streaming.EventHubs/Hosting/SiloBuilderExtensions.cs b/src/Azure/Orleans.Streaming.EventHubs/Hosting/SiloBuilderExtensions.cs
new file mode 100644
index 00000000000..e88adceae47
--- /dev/null
+++ b/src/Azure/Orleans.Streaming.EventHubs/Hosting/SiloBuilderExtensions.cs
@@ -0,0 +1,45 @@
+using System;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.Configuration;
+using Orleans.Hosting;
+using Orleans.ServiceBus.Providers;
+
+namespace Orleans.Hosting
+{
+ public static class SiloBuilderExtensions
+ {
+ ///
+ /// Configure silo to use event hub persistent streams.
+ ///
+ public static ISiloHostBuilder AddEventHubStreams(this ISiloHostBuilder builder, string name, Action configureOptions)
+ {
+ return builder.ConfigureServices(services => services.AddSiloEventHubStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use event hub persistent streams.
+ ///
+ public static ISiloHostBuilder AddEventHubStreams(this ISiloHostBuilder builder, string name, Action> configureOptions = null)
+ {
+ return builder.ConfigureServices(services => services.AddSiloEventHubStreams(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use event hub persistent streams.
+ ///
+ public static IServiceCollection AddSiloEventHubStreams(this IServiceCollection services, string name, Action configureOptions)
+ {
+ return services.AddSiloEventHubStreams(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure silo to use event hub persistent streams.
+ ///
+ public static IServiceCollection AddSiloEventHubStreams(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ {
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddSiloPersistentStreams(name, EventHubAdapterFactory.Create, configureOptions);
+ }
+ }
+}
diff --git a/src/Azure/Orleans.Streaming.EventHubs/Orleans.Streaming.EventHubs.csproj b/src/Azure/Orleans.Streaming.EventHubs/Orleans.Streaming.EventHubs.csproj
index 3d1ac81e795..630423cbcb4 100644
--- a/src/Azure/Orleans.Streaming.EventHubs/Orleans.Streaming.EventHubs.csproj
+++ b/src/Azure/Orleans.Streaming.EventHubs/Orleans.Streaming.EventHubs.csproj
@@ -27,6 +27,7 @@
+
diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/EventDataGeneratorStreamProvider/EventDataGeneratorAdapterFactory.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/EventDataGeneratorStreamProvider/EventDataGeneratorAdapterFactory.cs
new file mode 100644
index 00000000000..e5c1c309e6c
--- /dev/null
+++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/EventDataGeneratorStreamProvider/EventDataGeneratorAdapterFactory.cs
@@ -0,0 +1,181 @@
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.Azure.EventHubs;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Orleans.Configuration;
+using Orleans.Providers;
+using Orleans.Providers.Streams.Common;
+using Orleans.Runtime;
+using Orleans.Serialization;
+using Orleans.Streams;
+
+namespace Orleans.ServiceBus.Providers.Testing
+{
+ ///
+ /// This is a persistent stream provider adapter that generates it's own events rather than reading them from Eventhub.
+ /// This is primarily for test purposes.
+ ///
+ public class EventDataGeneratorAdapterFactory : EventHubAdapterFactory, IControllable
+ {
+ private EventDataGeneratorStreamOptions ehGeneratorOptions;
+
+ public EventDataGeneratorAdapterFactory(string name, EventDataGeneratorStreamOptions options, IServiceProvider serviceProvider, SerializationManager serializationManager, ITelemetryProducer telemetryProducer, ILoggerFactory loggerFactory)
+ : base(name, options, serviceProvider, serializationManager, telemetryProducer, loggerFactory)
+ {
+ this.ehGeneratorOptions = options;
+ }
+
+ public override void Init()
+ {
+ this.CheckpointerFactory = partition => Task.FromResult>(NoOpCheckpointer.Instance);
+ this.EventHubReceiverFactory = this.EHGeneratorReceiverFactory;
+ base.Init();
+ }
+
+ ///
+ protected override void InitEventHubClient()
+ {
+ //do nothing, EventDataGeneratorStreamProvider doesn't need connection with EventHubClient
+ }
+
+ ///
+ /// Generate mocked eventhub partition Ids from EventHubGeneratorStreamProviderSettings
+ ///
+ ///
+ protected override Task GetPartitionIdsAsync()
+ {
+ return Task.FromResult(GenerateEventHubPartitions(this.ehGeneratorOptions.EventHubPartitionCount));
+ }
+
+ private Task EHGeneratorReceiverFactory(EventHubPartitionSettings settings, string offset, ILogger logger, ITelemetryProducer telemetryProducer)
+ {
+ Func> streamGeneratorFactory = this.serviceProvider.GetServiceByName>>(this.Name)
+ ?? SimpleStreamEventDataGenerator.CreateFactory(this.serviceProvider);
+ var generator = new EventHubPartitionDataGenerator(this.ehGeneratorOptions, streamGeneratorFactory, logger);
+ var generatorReceiver = new EventHubPartitionGeneratorReceiver(generator);
+ return Task.FromResult(generatorReceiver);
+ }
+
+ private void RandomlyPlaceStreamToQueue(StreamRandomPlacementArg args)
+ {
+ if (args == null)
+ return;
+ int randomNumber = args.RandomNumber;
+ IStreamIdentity streamId = args.StreamId;
+ var allQueueInTheCluster = (this.EventHubQueueMapper as EventHubQueueMapper)?.GetAllQueues().OrderBy(queueId => queueId.ToString());
+
+ if (allQueueInTheCluster != null)
+ {
+ //every agent receive the same random number, do a mod on queue count, get the same random queue to assign stream to.
+ int randomQueue = randomNumber % allQueueInTheCluster.Count();
+ var queueToAssign = allQueueInTheCluster.ToList()[randomQueue];
+ EventHubAdapterReceiver receiverToAssign;
+ if (this.EventHubReceivers.TryGetValue(queueToAssign, out receiverToAssign))
+ {
+ receiverToAssign.ConfigureDataGeneratorForStream(streamId);
+ logger.Info($"Stream {streamId.Namespace}-{streamId.Guid.ToString()} is assigned to queue {queueToAssign.ToString()}");
+ }
+ }
+ else
+ {
+ logger.Info("Cannot get queues in the cluster, current streamQueueMapper is not EventHubQueueMapper");
+ }
+ }
+
+ private void StopProducingOnStream(IStreamIdentity streamId)
+ {
+ foreach (var ehReceiver in this.EventHubReceivers)
+ {
+ //if the stream is assigned to this receiver/queue, then it will ask the data generator to stop producing
+ ehReceiver.Value.StopProducingOnStream(streamId);
+ }
+ }
+
+ public static string[] GenerateEventHubPartitions(int partitionCount)
+ {
+ var size = partitionCount;
+ var partitions = new string[size];
+ for (int i = 0; i < size; i++)
+ partitions[i] = $"partition-{(i).ToString()}";
+ return partitions;
+ }
+
+ #region IControllable interface
+ ///
+ /// Commands for IControllable
+ ///
+ public enum Commands
+ {
+ ///
+ /// Command for Randomly_Place_Stream_To_Queue
+ ///
+ Randomly_Place_Stream_To_Queue = (int)PersistentStreamProviderCommand.AdapterFactoryCommandStartRange + 4,
+ ///
+ /// Command for Stop_Producing_On_Stream
+ ///
+ Stop_Producing_On_Stream = (int)PersistentStreamProviderCommand.AdapterFactoryCommandStartRange + 5
+ }
+
+ ///
+ /// Args for RandomlyPlaceStreamToQueue method
+ ///
+ [Serializable]
+ public class StreamRandomPlacementArg
+ {
+ ///
+ /// StreamId
+ ///
+ public IStreamIdentity StreamId { get; set; }
+
+ ///
+ /// A random number
+ ///
+ public int RandomNumber { get; set; }
+
+ ///
+ /// Constructor
+ ///
+ ///
+ ///
+ public StreamRandomPlacementArg(IStreamIdentity streamId, int randomNumber)
+ {
+ this.StreamId = streamId;
+ this.RandomNumber = randomNumber;
+ }
+ }
+
+ ///
+ /// Execute Command
+ ///
+ ///
+ ///
+ ///
+ public virtual Task