Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Options;
using Orleans.Runtime.Configuration;

namespace Orleans.Configuration
{
Expand Down
48 changes: 0 additions & 48 deletions src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,6 @@ namespace Orleans.Runtime.Configuration
/// </summary>
public static class ConfigurationExtensions
{
/// <summary>
/// Adds a stream provider of type <see cref="SimpleMessageStreamProvider"/>
/// </summary>
/// <param name="config">The cluster configuration object to add provider to.</param>
/// <param name="providerName">The provider name.</param>
/// <param name="fireAndForgetDelivery">Specifies whether the producer waits for the consumer to process the event before continuing. Setting this to false is useful for troubleshooting serialization issues.</param>
/// <param name="optimizeForImmutableData">If set to true items transfered via the stream are always wrapped in Immutable for delivery.</param>>
/// <param name="pubSubType">Specifies how can grains subscribe to this stream.</param>
public static void AddSimpleMessageStreamProvider(this ClientConfiguration config, string providerName,
bool fireAndForgetDelivery = SimpleMessageStreamProvider.DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY,
bool optimizeForImmutableData = SimpleMessageStreamProvider.DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA,
StreamPubSubType pubSubType = SimpleMessageStreamProvider.DEFAULT_STREAM_PUBSUB_TYPE)
{
var properties = GetSimpleMessageStreamProviderConfiguration(providerName, fireAndForgetDelivery, optimizeForImmutableData, pubSubType);
config.RegisterStreamProvider<SimpleMessageStreamProvider>(providerName, properties);
}

/// <summary>
/// Adds a stream provider of type <see cref="SimpleMessageStreamProvider"/>
/// </summary>
/// <param name="config">The cluster configuration object to add provider to.</param>
/// <param name="providerName">The provider name.</param>
/// <param name="fireAndForgetDelivery">Specifies whether the producer waits for the consumer to process the event before continuing. Setting this to false is useful for troubleshooting serialization issues.</param>
/// <param name="optimizeForImmutableData">If set to true items transfered via the stream are always wrapped in Immutable for delivery.</param>
/// <param name="pubSubType">Specifies how can grains subscribe to this stream.</param>
public static void AddSimpleMessageStreamProvider(this ClusterConfiguration config, string providerName,
bool fireAndForgetDelivery = SimpleMessageStreamProvider.DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY,
bool optimizeForImmutableData = SimpleMessageStreamProvider.DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA,
StreamPubSubType pubSubType = SimpleMessageStreamProvider.DEFAULT_STREAM_PUBSUB_TYPE)
{
var properties = GetSimpleMessageStreamProviderConfiguration(providerName, fireAndForgetDelivery, optimizeForImmutableData, pubSubType);
config.Globals.RegisterStreamProvider<SimpleMessageStreamProvider>(providerName, properties);
}

/// <summary>
/// Configures all cluster nodes to use the specified startup class for dependency injection.
/// </summary>
Expand All @@ -59,19 +25,5 @@ public static void UseStartupType<TStartup>(this ClusterConfiguration config)

config.Defaults.StartupTypeName = startupName;
}

private static Dictionary<string, string> GetSimpleMessageStreamProviderConfiguration(string providerName, bool fireAndForgetDelivery, bool optimizeForImmutableData, StreamPubSubType pubSubType)
{
if (string.IsNullOrWhiteSpace(providerName)) throw new ArgumentNullException(nameof(providerName));

var properties = new Dictionary<string, string>
{
{ SimpleMessageStreamProvider.FIRE_AND_FORGET_DELIVERY, fireAndForgetDelivery.ToString() },
{ SimpleMessageStreamProvider.OPTIMIZE_FOR_IMMUTABLE_DATA, optimizeForImmutableData.ToString() },
{ SimpleMessageStreamProvider.STREAM_PUBSUB_TYPE, pubSubType.ToString() },
};

return properties;
}
}
}
43 changes: 43 additions & 0 deletions src/Orleans.Core/Streams/ClientStreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration;
using Orleans.Providers.Streams.Common;
using Orleans.Providers.Streams.SimpleMessageStream;
using Orleans.Runtime;
using Orleans.Streams;

Expand Down Expand Up @@ -49,5 +50,47 @@ public static IServiceCollection AddClusterClientPersistentStreams<TOptions>(thi
.AddSingletonNamedService<ILifecycleParticipant<IClusterClientLifecycle>>(name, (s, n) => ((PersistentStreamProvider)s.GetRequiredServiceByName<IStreamProvider>(n)).ParticipateIn<IClusterClientLifecycle>())
.AddSingletonNamedService<IQueueAdapterFactory>(name, adapterFactory);
}

/// <summary>
/// Configure client to use SimpleMessageProvider
/// </summary>
public static IClientBuilder AddSimpleMessageStreamProvider(this IClientBuilder builder, string name,
Action<SimpleMessageStreamProviderOptions> configureOptions)

{
return builder.ConfigureServices(services =>
services.AddClusterClientSimpleMessageStreamProvider(name, configureOptions));
}

/// <summary>
/// Configure client to use SimpleMessageProvider
/// </summary>
public static IClientBuilder AddSimpleMessageStreamProvider(this IClientBuilder builder, string name,
Action<OptionsBuilder<SimpleMessageStreamProviderOptions>> configureOptions = null)

{
return builder.ConfigureServices(services =>
services.AddClusterClientSimpleMessageStreamProvider(name, configureOptions));
}

/// <summary>
/// Configure client to use simple message stream provider
/// </summary>
public static IServiceCollection AddClusterClientSimpleMessageStreamProvider(this IServiceCollection services, string name,
Action<SimpleMessageStreamProviderOptions> configureOptions = null)
{
return services.AddClusterClientSimpleMessageStreamProvider(name, ob => ob.Configure(configureOptions));
}

/// <summary>
/// Configure client to use simple message provider
/// </summary>
public static IServiceCollection AddClusterClientSimpleMessageStreamProvider(this IServiceCollection services, string name,
Action<OptionsBuilder<SimpleMessageStreamProviderOptions>> configureOptions = null)
{
configureOptions?.Invoke(services.AddOptions<SimpleMessageStreamProviderOptions>(name));
return services.ConfigureNamedOptionForLogging<SimpleMessageStreamProviderOptions>(name)
.AddSingletonNamedService<IStreamProvider>(name, SimpleMessageStreamProvider.Create);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,76 +1,49 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Runtime;
using Orleans.Streams;
using Orleans.Streams.Core;
using Microsoft.Extensions.Logging;
using Orleans.Serialization;
using Orleans.Configuration;

namespace Orleans.Providers.Streams.SimpleMessageStream
{
using Orleans.Serialization;

public class SimpleMessageStreamProvider : IStreamProvider, IProvider, IInternalStreamProvider, IStreamSubscriptionManagerRetriever
public class SimpleMessageStreamProvider : IInternalStreamProvider, IStreamProvider, IStreamSubscriptionManagerRetriever
{
public string Name { get; private set; }

private ILogger logger;
private IStreamProviderRuntime providerRuntime;
private bool fireAndForgetDelivery;
private bool optimizeForImmutableData;
private StreamPubSubType pubSubType;
private ProviderStateManager stateManager = new ProviderStateManager();
private IRuntimeClient runtimeClient;
private IStreamSubscriptionManager streamSubscriptionManager;
private ILoggerFactory loggerFactory;
private SerializationManager serializationManager;

internal const string STREAM_PUBSUB_TYPE = "PubSubType";
internal const string FIRE_AND_FORGET_DELIVERY = "FireAndForgetDelivery";
internal const string OPTIMIZE_FOR_IMMUTABLE_DATA = "OptimizeForImmutableData";
internal const StreamPubSubType DEFAULT_STREAM_PUBSUB_TYPE = StreamPubSubType.ExplicitGrainBasedAndImplicit;
internal const bool DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY = false;
internal const bool DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA = true;
private SimpleMessageStreamProviderOptions options;
public bool IsRewindable { get { return false; } }

public Task Init(string name, IProviderRuntime providerUtilitiesManager, IProviderConfiguration config)
public SimpleMessageStreamProvider(string name, SimpleMessageStreamProviderOptions options,
ILoggerFactory loggerFactory, IProviderRuntime providerRuntime, SerializationManager serializationManager)
{
if (!stateManager.PresetState(ProviderState.Initialized)) return Task.CompletedTask;
this.loggerFactory = loggerFactory;
this.Name = name;
providerRuntime = (IStreamProviderRuntime) providerUtilitiesManager;
this.runtimeClient = this.providerRuntime.ServiceProvider.GetRequiredService<IRuntimeClient>();
this.serializationManager = this.providerRuntime.ServiceProvider.GetRequiredService<SerializationManager>();
fireAndForgetDelivery = config.GetBoolProperty(FIRE_AND_FORGET_DELIVERY, DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY);
optimizeForImmutableData = config.GetBoolProperty(OPTIMIZE_FOR_IMMUTABLE_DATA, DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA);

string pubSubTypeString;
pubSubType = !config.Properties.TryGetValue(STREAM_PUBSUB_TYPE, out pubSubTypeString)
? DEFAULT_STREAM_PUBSUB_TYPE
: (StreamPubSubType)Enum.Parse(typeof(StreamPubSubType), pubSubTypeString);
if (pubSubType == StreamPubSubType.ExplicitGrainBasedAndImplicit
|| pubSubType == StreamPubSubType.ExplicitGrainBasedOnly)
this.logger = loggerFactory.CreateLogger($"{this.GetType().FullName}.{name}");
this.options = options;
this.providerRuntime = providerRuntime as IStreamProviderRuntime;
this.runtimeClient = providerRuntime.ServiceProvider.GetService<IRuntimeClient>();
this.serializationManager = serializationManager;
if (this.options.PubSubType == StreamPubSubType.ExplicitGrainBasedAndImplicit
|| this.options.PubSubType == StreamPubSubType.ExplicitGrainBasedOnly)
{
this.streamSubscriptionManager = this.providerRuntime.ServiceProvider
.GetService<IStreamSubscriptionManagerAdmin>().GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly);
.GetService<IStreamSubscriptionManagerAdmin>()
.GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly);
}
this.loggerFactory = providerRuntime.ServiceProvider.GetRequiredService<ILoggerFactory>();
logger = loggerFactory.CreateLogger(this.GetType().FullName);
logger.Info("Initialized SimpleMessageStreamProvider with name {0} and with property FireAndForgetDelivery: {1}, OptimizeForImmutableData: {2} " +
"and PubSubType: {3}", Name, fireAndForgetDelivery, optimizeForImmutableData, pubSubType);
stateManager.CommitState();
return Task.CompletedTask;
}

public Task Start()
{
if (stateManager.PresetState(ProviderState.Started)) stateManager.CommitState();
return Task.CompletedTask;
}

public Task Close()
{
if (stateManager.PresetState(ProviderState.Closed)) stateManager.CommitState();
return Task.CompletedTask;
logger.Info(
"Initialized SimpleMessageStreamProvider with name {0} and with property FireAndForgetDelivery: {1}, OptimizeForImmutableData: {2} " +
"and PubSubType: {3}", Name, this.options.FireAndForgetDelivery, this.options.OptimizeForImmutableData,
this.options.PubSubType);
}

public IStreamSubscriptionManager GetStreamSubscriptionManager()
Expand All @@ -89,7 +62,7 @@ public IAsyncStream<T> GetStream<T>(Guid id, string streamNamespace)
IInternalAsyncBatchObserver<T> IInternalStreamProvider.GetProducerInterface<T>(IAsyncStream<T> stream)
{
return new SimpleMessageStreamProducer<T>((StreamImpl<T>)stream, Name, providerRuntime,
fireAndForgetDelivery, optimizeForImmutableData, providerRuntime.PubSub(pubSubType), IsRewindable,
this.options.FireAndForgetDelivery, this.options.OptimizeForImmutableData, providerRuntime.PubSub(this.options.PubSubType), IsRewindable,
this.serializationManager, this.loggerFactory);
}

Expand All @@ -101,7 +74,12 @@ IInternalAsyncObservable<T> IInternalStreamProvider.GetConsumerInterface<T>(IAsy
private IInternalAsyncObservable<T> GetConsumerInterfaceImpl<T>(IAsyncStream<T> stream)
{
return new StreamConsumer<T>((StreamImpl<T>)stream, Name, providerRuntime,
providerRuntime.PubSub(pubSubType), this.logger, IsRewindable);
providerRuntime.PubSub(this.options.PubSubType), this.logger, IsRewindable);
}

public static IStreamProvider Create(IServiceProvider services, string name)
{
return ActivatorUtilities.CreateInstance<SimpleMessageStreamProvider>(services, name, services.GetService<IOptionsSnapshot<SimpleMessageStreamProviderOptions>>().Get(name));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

using Orleans.Streams;

namespace Orleans.Configuration
{
public class SimpleMessageStreamProviderOptions
{
public bool FireAndForgetDelivery { get; set; } = DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY;
public const bool DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY = false;

public bool OptimizeForImmutableData { get; set; } = DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA;
public const bool DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA = true;

public StreamPubSubType PubSubType { get; set; } = DEFAULT_PUBSUB_TYPE;
public static StreamPubSubType DEFAULT_PUBSUB_TYPE = StreamPubSubType.ExplicitGrainBasedAndImplicit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Orleans.Providers.Streams.Common;
using Orleans.Providers;
using Orleans.Configuration;
using Orleans.Providers.Streams.SimpleMessageStream;

namespace Orleans.Hosting
{
Expand Down Expand Up @@ -51,5 +52,47 @@ public static IServiceCollection AddSiloPersistentStreams<TOptions>(this IServic
.AddSingletonNamedService<IQueueAdapterFactory>(name, adapterFactory)
.AddSingletonNamedService(name, (s, n) => s.GetServiceByName<IStreamProvider>(n) as IControllable);
}

/// <summary>
/// Configure silo to use SimpleMessageProvider
/// </summary>
public static ISiloHostBuilder AddSimpleMessageStreamProvider(this ISiloHostBuilder builder, string name,
Action<SimpleMessageStreamProviderOptions> configureOptions)

{
return builder.ConfigureServices(services =>
services.AddSiloSimpleMessageStreamProvider(name, configureOptions));
}

/// <summary>
/// Configure silo to use SimpleMessageProvider
/// </summary>
public static ISiloHostBuilder AddSimpleMessageStreamProvider(this ISiloHostBuilder builder, string name,
Action<OptionsBuilder<SimpleMessageStreamProviderOptions>> configureOptions = null)

{
return builder.ConfigureServices(services =>
services.AddSiloSimpleMessageStreamProvider(name, configureOptions));
}

/// <summary>
/// Configure silo to use simple message stream provider
/// </summary>
public static IServiceCollection AddSiloSimpleMessageStreamProvider(this IServiceCollection services, string name,
Action<SimpleMessageStreamProviderOptions> configureOptions = null)
{
return services.AddSiloSimpleMessageStreamProvider(name, ob => ob.Configure(configureOptions));
}

/// <summary>
/// Configure silo to use simple message provider
/// </summary>
public static IServiceCollection AddSiloSimpleMessageStreamProvider(this IServiceCollection services, string name,
Action<OptionsBuilder<SimpleMessageStreamProviderOptions>> configureOptions = null)
{
configureOptions?.Invoke(services.AddOptions<SimpleMessageStreamProviderOptions>(name));
return services.ConfigureNamedOptionForLogging<SimpleMessageStreamProviderOptions>(name)
.AddSingletonNamedService<IStreamProvider>(name, SimpleMessageStreamProvider.Create);
}
}
}
6 changes: 2 additions & 4 deletions test/AWSUtils.Tests/Streaming/SQSStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
//from the config files
options.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);

options.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false);

options.ClientConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false);

//previous silo creation
options.ClusterConfiguration.Globals.DataConnectionString = AWSTestConstants.DefaultSQSConnectionString;
options.ClientConfiguration.DataConnectionString = AWSTestConstants.DefaultSQSConnectionString;
Expand Down Expand Up @@ -66,6 +62,7 @@ private class MySiloBuilderConfigurator : ISiloBuilderConfigurator
public void Configure(ISiloHostBuilder hostBuilder)
{
hostBuilder
.AddSimpleMessageStreamProvider("SMSProvider")
.AddSqsStreams("SQSProvider", options =>
{
options.ConnectionString = AWSTestConstants.DefaultSQSConnectionString;
Expand All @@ -82,6 +79,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder
.AddSimpleMessageStreamProvider("SMSProvider")
.AddSqsStreams("SQSProvider", options =>
{
options.ConnectionString = AWSTestConstants.DefaultSQSConnectionString;
Expand Down
Loading