Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<TargetFramework>netcoreapp3.0</TargetFramework>

<IsPackable>false</IsPackable>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
Expand All @@ -11,19 +11,16 @@
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="3.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.3.0" />
<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="2.4.3">
<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Orleans.Core" Version="2.4.3" />
<PackageReference Include="Microsoft.Orleans.OrleansCodeGenerator" Version="2.4.3" />
<PackageReference Include="Microsoft.Orleans.OrleansProviders" Version="2.4.3" />
<PackageReference Include="Microsoft.Orleans.OrleansRuntime" Version="2.4.3" />
<PackageReference Include="Microsoft.Orleans.Server" Version="2.4.3" />
<PackageReference Include="Microsoft.Orleans.OrleansCodeGenerator" Version="3.0.0" />
<PackageReference Include="Microsoft.Orleans.OrleansRuntime" Version="3.0.0" />
<PackageReference Include="Microsoft.Orleans.Server" Version="3.0.0" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.15.1" />
<PackageReference Include="protobuf-net" Version="2.4.1" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
<PackageReference Include="ToxiproxyNetCore" Version="1.0.2" />
</ItemGroup>

Expand Down
2 changes: 0 additions & 2 deletions Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public static async Task<TestCluster> Create()
.UseLocalhostClustering(siloPort: 11111, gatewayPort: 30000)
.Configure<ClusterMembershipOptions>(options =>
{
options.ExpectedClusterSize = 2;
options.UseLivenessGossip = true;
options.ProbeTimeout = TimeSpan.FromSeconds(5);
options.NumMissedProbesLimit = 3;
Expand All @@ -80,7 +79,6 @@ public static async Task<TestCluster> Create()
primarySiloEndpoint: new IPEndPoint(IPAddress.Loopback, EndpointOptions.DEFAULT_SILO_PORT))
.Configure<ClusterMembershipOptions>(options =>
{
options.ExpectedClusterSize = 2;
options.UseLivenessGossip = true;
options.ProbeTimeout = TimeSpan.FromSeconds(5);
options.NumMissedProbesLimit = 3;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

55 changes: 48 additions & 7 deletions Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,67 @@
using System;
using Orleans.Configuration;
using Orleans.Streaming;
using Orleans.Streams.BatchContainer;

namespace Orleans.Hosting
{
public static class ClientBuilderExtensions
{
/// <summary>
/// Configure client to use RMQ persistent streams, using the <see cref="DefaultBatchContainerSerializer"/>.
/// </summary>
[Obsolete("Use 'UseRabbitMqStreams'")]
public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action<ClusterClientRabbitMqStreamConfigurator<DefaultBatchContainerSerializer>> configure = null)
{
return UseRabbitMqStreams(builder, name, configure);
}

/// <summary>
/// Configure client to use RMQ persistent streams.
/// This version enables to inject a custom BacthContainer serializer.
/// </summary>
public static IClientBuilder AddRabbitMqStream<TSerializer>(this IClientBuilder builder, string name, Action<ClusterClientRabbitMqStreamConfigurator<TSerializer>> configure) where TSerializer : IBatchContainerSerializer, new()
[Obsolete("Use 'UseRabbitMqStreams'")]
public static IClientBuilder AddRabbitMqStream<TSerializer>(this IClientBuilder builder, string name, Action<ClusterClientRabbitMqStreamConfigurator<TSerializer>> configure = null)
where TSerializer : IBatchContainerSerializer, new()
{
configure?.Invoke(new ClusterClientRabbitMqStreamConfigurator<TSerializer>(name, builder));
return builder;
return UseRabbitMqStreams(builder, name, configure);
}

/// <summary>
/// Configure client to use RMQ persistent streams, using the <see cref="DefaultBatchContainerSerializer"/>.
/// </summary>
public static IClientBuilder UseRabbitMqStreams(this IClientBuilder builder, string name, Action<RabbitMqOptions> options)
{
return UseRabbitMqStreams<DefaultBatchContainerSerializer>(builder, name, options);
}

/// <summary>
/// Configure client to use RMQ persistent streams.
/// This version uses the default Orleans serializer.
/// </summary>
public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action<ClusterClientRabbitMqStreamConfigurator<DefaultBatchContainerSerializer>> configure)
=> AddRabbitMqStream<DefaultBatchContainerSerializer>(builder, name, configure);
public static IClientBuilder UseRabbitMqStreams<TSerializer>(this IClientBuilder builder, string name, Action<RabbitMqOptions> options)
where TSerializer : IBatchContainerSerializer, new()
{
return builder.UseRabbitMqStreams<TSerializer>(name, b => b.ConfigureRabbitMq(ob => ob.Configure(options)));

}

/// <summary>
/// Configure client to use RMQ persistent streams, using the <see cref="DefaultBatchContainerSerializer"/>.
/// </summary>
public static IClientBuilder UseRabbitMqStreams(this IClientBuilder builder, string name, Action<ClusterClientRabbitMqStreamConfigurator<DefaultBatchContainerSerializer>> configure = null)
{
return UseRabbitMqStreams<DefaultBatchContainerSerializer>(builder, name, configure);
}

/// <summary>
/// Configure client to use RMQ persistent streams.
/// </summary>
public static IClientBuilder UseRabbitMqStreams<TSerializer>(this IClientBuilder builder, string name, Action<ClusterClientRabbitMqStreamConfigurator<TSerializer>> configure = null)
where TSerializer : IBatchContainerSerializer, new()
{
var configurator = new ClusterClientRabbitMqStreamConfigurator<TSerializer>(name, builder);
configure?.Invoke(configurator);

return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ namespace Orleans.Configuration
{
public class RabbitMqOptions
{
public string HostName;
public int Port;
public string VirtualHost;
public string UserName;
public string Password;

public string QueueNamePrefix;
public bool UseQueuePartitioning = DefaultUseQueuePartitioning;
public int NumberOfQueues = DefaultNumberOfQueues;
public string HostName { get; set; }
public int Port { get; set; }
public string VirtualHost { get; set; }
public string UserName { get; set; }
public string Password { get; set; }

public string QueueNamePrefix { get; set; }
public bool UseQueuePartitioning { get; set; } = DefaultUseQueuePartitioning;
public int NumberOfQueues { get; set; } = DefaultNumberOfQueues;


public const bool DefaultUseQueuePartitioning = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Orleans.OrleansProviders" Version="2.4.3" />
<PackageReference Include="Microsoft.Orleans.OrleansProviders" Version="3.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
</ItemGroup>

Expand Down
19 changes: 12 additions & 7 deletions Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Providers.Streams.Common;
using Orleans.Serialization;
using Orleans.Streams.BatchContainer;
using Orleans.Streams.Cache;

namespace Orleans.Streams
{
Expand All @@ -29,7 +29,9 @@ public RabbitMqAdapterFactory(
if (serviceProvider == null) throw new ArgumentNullException(nameof(serviceProvider));
if (loggerFactory == null) throw new ArgumentNullException(nameof(loggerFactory));

_cache = new ConcurrentQueueAdapterCache(cachingOptions.CacheSize);
_cache = new SimpleQueueAdapterCache(new SimpleQueueCacheOptions() {CacheSize = cachingOptions.CacheSize}, providerName, loggerFactory);


_mapper = new HashRingBasedStreamQueueMapper(new HashRingStreamQueueMapperOptions { TotalQueueCount = rmqOptions.NumberOfQueues }, rmqOptions.QueueNamePrefix);
_failureHandler = Task.FromResult<IStreamFailureHandler>(new NoOpStreamDeliveryFailureHandler(false));

Expand All @@ -46,10 +48,13 @@ public RabbitMqAdapterFactory(
public IStreamQueueMapper GetStreamQueueMapper() => _mapper;

public static RabbitMqAdapterFactory<TSerializer> Create(IServiceProvider services, string name)
=> ActivatorUtilities.CreateInstance<RabbitMqAdapterFactory<TSerializer>>(
services,
name,
services.GetOptionsByName<RabbitMqOptions>(name),
services.GetOptionsByName<CachingOptions>(name));
{
var rabbitMqOptions = services.GetOptionsByName<RabbitMqOptions>(name);
var cachingOptions = services.GetOptionsByName<CachingOptions>(name);

var factory = ActivatorUtilities.CreateInstance<RabbitMqAdapterFactory<TSerializer>>(services, name, rabbitMqOptions, cachingOptions);

return factory;
}
}
}
Loading