diff --git a/Directory.Packages.props b/Directory.Packages.props index e6c8895..27b4f0a 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -30,6 +30,7 @@ + @@ -53,4 +54,4 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - + \ No newline at end of file diff --git a/Fluent.Brighter.sln b/Fluent.Brighter.sln index d75088d..532301b 100644 --- a/Fluent.Brighter.sln +++ b/Fluent.Brighter.sln @@ -47,6 +47,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.SqlServer", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlServerSample", "samples\SqlServerSample\SqlServerSample.csproj", "{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.Redis", "src\Fluent.Brighter.Redis\Fluent.Brighter.Redis.csproj", "{8DC14D04-F8D0-41CB-8964-B75257CEFE68}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisSample", "samples\RedisSample\RedisSample.csproj", "{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -285,6 +289,30 @@ Global {7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x64.Build.0 = Release|Any CPU {7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x86.ActiveCfg = Release|Any CPU {7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x86.Build.0 = Release|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x64.ActiveCfg = Debug|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x64.Build.0 = Debug|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x86.ActiveCfg = Debug|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x86.Build.0 = Debug|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|Any CPU.Build.0 = Release|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x64.ActiveCfg = Release|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x64.Build.0 = Release|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x86.ActiveCfg = Release|Any CPU + {8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x86.Build.0 = Release|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x64.ActiveCfg = Debug|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x64.Build.0 = Debug|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x86.ActiveCfg = Debug|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x86.Build.0 = Debug|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|Any CPU.Build.0 = Release|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x64.ActiveCfg = Release|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x64.Build.0 = Release|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.ActiveCfg = Release|Any CPU + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -309,5 +337,7 @@ Global {523946F8-241E-4DF6-92A3-875B1DDCDF39} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA} {1BBDF3C1-F4FF-404E-A5A9-6E3A743280BF} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} {7DB6D8C1-5EDE-4EF1-953B-515C26164F9C} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA} + {8DC14D04-F8D0-41CB-8964-B75257CEFE68} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA} EndGlobalSection EndGlobal diff --git a/samples/RedisSample/Commands/FarewellEvent.cs b/samples/RedisSample/Commands/FarewellEvent.cs new file mode 100644 index 0000000..1e3a748 --- /dev/null +++ b/samples/RedisSample/Commands/FarewellEvent.cs @@ -0,0 +1,12 @@ +using Paramore.Brighter; + +namespace RedisSample.Commands; + +public class FarewellEvent(string farewell) : Event(Id.Random()) +{ + public FarewellEvent() : this(string.Empty) + { + } + + public string Farewell { get; set; } = farewell; +} \ No newline at end of file diff --git a/samples/RedisSample/Commands/GreetingEvent.cs b/samples/RedisSample/Commands/GreetingEvent.cs new file mode 100644 index 0000000..9c6c5b9 --- /dev/null +++ b/samples/RedisSample/Commands/GreetingEvent.cs @@ -0,0 +1,10 @@ +using Paramore.Brighter; + +namespace RedisSample.Commands; + +public class GreetingEvent(string greeting) : Event(Id.Random()) +{ + public GreetingEvent() : this(string.Empty) { } + + public string Greeting { get; set; } = greeting; +} \ No newline at end of file diff --git a/samples/RedisSample/Handlers/FarewellEventHandler.cs b/samples/RedisSample/Handlers/FarewellEventHandler.cs new file mode 100644 index 0000000..4a243bb --- /dev/null +++ b/samples/RedisSample/Handlers/FarewellEventHandler.cs @@ -0,0 +1,14 @@ +using Paramore.Brighter; + +using RedisSample.Commands; + +namespace RedisSample.Handlers; + +public class FarewellEventHandler : RequestHandler +{ + public override FarewellEvent Handle(FarewellEvent @event) + { + Console.WriteLine(">>>>>> {0}", @event.Farewell); + return base.Handle(@event); + } +} \ No newline at end of file diff --git a/samples/RedisSample/Handlers/GreetingEventHandler.cs b/samples/RedisSample/Handlers/GreetingEventHandler.cs new file mode 100644 index 0000000..f47dce3 --- /dev/null +++ b/samples/RedisSample/Handlers/GreetingEventHandler.cs @@ -0,0 +1,14 @@ +using Paramore.Brighter; + +using RedisSample.Commands; + +namespace RedisSample.Handlers; + +public class GreetingEventHandler : RequestHandler +{ + public override GreetingEvent Handle(GreetingEvent @event) + { + Console.WriteLine("====== {0}", @event.Greeting); + return base.Handle(@event); + } +} \ No newline at end of file diff --git a/samples/RedisSample/Program.cs b/samples/RedisSample/Program.cs new file mode 100644 index 0000000..46ef1fc --- /dev/null +++ b/samples/RedisSample/Program.cs @@ -0,0 +1,80 @@ +// See https://aka.ms/new-console-template for more information + +using Fluent.Brighter; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +using Paramore.Brighter; +using Paramore.Brighter.ServiceActivator.Extensions.Hosting; + +using RedisSample.Commands; + +using Serilog; + +Log.Logger = new LoggerConfiguration() + .MinimumLevel.Information() + .Enrich.FromLogContext() + .WriteTo.Console() + .CreateLogger(); + + +var host = new HostBuilder() + .UseSerilog() + .ConfigureServices((_, services) => + { + services + .AddHostedService() + .AddFluentBrighter(builder => + { + builder + .UsingRedis(cfg => + { + cfg.SetConnection(c => c + .SetRedisConnectionString("redis://localhost:6379?ConnectTimeout=1000&SendTimeout=1000") + .SetMaxPoolSize(10) + .SetMessageTimeToLive(TimeSpan.FromMinutes(10)) + .SetDefaultRetryTimeout(3_000)); + + cfg + .UsePublications(pp => pp + .AddPublication(p => p.SetTopic("greeting.topic")) + .AddPublication(p => p.SetTopic("farewell.topic"))) + .UseSubscriptions(sb => sb + .AddSubscription(s => s + .SetTopic("greeting.topic") + .SetQueue("greeting.queue") + .SetMessagePumpType(MessagePumpType.Reactor)) + .AddSubscription(s => s + .SetTopic("farewell.topic") + .SetQueue("farewell.queue") + .SetMessagePumpType(MessagePumpType.Reactor))); + }); + }); + }) + .Build(); + +await host.StartAsync(); + +while (true) +{ + await Task.Delay(TimeSpan.FromSeconds(10)); + Console.Write("Say your name (or q to quit): "); + var name = Console.ReadLine(); + + if (string.IsNullOrEmpty(name)) + { + continue; + } + + if (name == "q") + { + break; + } + + var process = host.Services.GetRequiredService(); + await process.PostAsync(new GreetingEvent(name)); + await process.PostAsync(new FarewellEvent(name)); +} + +await host.StopAsync(); diff --git a/samples/RedisSample/RedisSample.csproj b/samples/RedisSample/RedisSample.csproj new file mode 100644 index 0000000..5c4cc60 --- /dev/null +++ b/samples/RedisSample/RedisSample.csproj @@ -0,0 +1,19 @@ + + + + Exe + net9.0 + enable + enable + false + + + + + + + + + + + diff --git a/samples/RedisSample/docker-compose.yml b/samples/RedisSample/docker-compose.yml new file mode 100644 index 0000000..89b4102 --- /dev/null +++ b/samples/RedisSample/docker-compose.yml @@ -0,0 +1,6 @@ +services: + redis: + image: redis + ports: + - "6379:6379" + \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/Extensions/ConsumerBuilderExtensions.cs b/src/Fluent.Brighter.Redis/Extensions/ConsumerBuilderExtensions.cs new file mode 100644 index 0000000..725e7a2 --- /dev/null +++ b/src/Fluent.Brighter.Redis/Extensions/ConsumerBuilderExtensions.cs @@ -0,0 +1,86 @@ +using System; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.Redis; + +namespace Fluent.Brighter.Redis.Extensions; + +/// +/// Provides extension methods for to configure Redis-based message consumers. +/// These extensions enable easy setup of Redis subscriptions and channel factories. +/// +public static class ConsumerBuilderExtensions +{ + /// + /// Adds a pre-configured Redis subscription to the consumer builder. + /// + /// The instance to configure. + /// The pre-configured to add. + /// The instance for method chaining. + public static ConsumerBuilder AddRedisSubscription(this ConsumerBuilder builder, RedisSubscription subscription) + { + return builder.AddSubscription(subscription); + } + + /// + /// Adds a Redis subscription to the consumer builder using a configuration action. + /// This method creates a new , applies the configuration, and builds the subscription. + /// + /// The instance to configure. + /// An action that configures the with subscription settings. + /// The instance for method chaining. + public static ConsumerBuilder AddRedisSubscription(this ConsumerBuilder builder, + Action configure) + { + var sub = new RedisSubscriptionBuilder(); + configure(sub); + return builder.AddSubscription(sub.Build()); + } + + /// + /// Adds a Redis subscription for a specific request type to the consumer builder. + /// This method automatically configures the subscription with the specified type + /// and then applies additional configuration provided by the action. + /// + /// The type of request/message that this subscription will handle. Must implement . + /// The instance to configure. + /// An action that configures the with additional settings. + /// The instance for method chaining. + public static ConsumerBuilder AddRedisSubscription(this ConsumerBuilder builder, + Action configure) + where TRequest : class, IRequest + { + var sub = new RedisSubscriptionBuilder(); + sub.SetDataType(typeof(TRequest)); + configure(sub); + return builder.AddSubscription(sub.Build()); + } + + /// + /// Adds a Redis channel factory to the consumer builder using a configuration builder. + /// Channel factories are responsible for creating message channels that consume messages from Redis. + /// + /// The instance to configure. + /// An action that configures the with connection details. + /// The instance for method chaining. + public static ConsumerBuilder AdRedisChannelFactory(this ConsumerBuilder builder, + Action configure) + { + var connection = new RedisMessagingGatewayConfigurationBuilder(); + configure(connection); + return builder.AddRedisChannelFactory(connection.Build()); + } + + /// + /// Adds a Redis channel factory to the consumer builder using a pre-configured Redis messaging gateway configuration. + /// Channel factories are responsible for creating message channels that consume messages from Redis. + /// + /// The instance to configure. + /// The pre-configured Redis messaging gateway configuration containing connection details. + /// The instance for method chaining. + public static ConsumerBuilder AddRedisChannelFactory(this ConsumerBuilder builder, RedisMessagingGatewayConfiguration connection) + { + return builder + .AddChannelFactory(new ChannelFactory(new RedisMessageConsumerFactory(connection))); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/Extensions/FluentBrighterExtensions.cs b/src/Fluent.Brighter.Redis/Extensions/FluentBrighterExtensions.cs new file mode 100644 index 0000000..dbb61f4 --- /dev/null +++ b/src/Fluent.Brighter.Redis/Extensions/FluentBrighterExtensions.cs @@ -0,0 +1,30 @@ +using System; + +using Fluent.Brighter.Redis; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for to configure Redis integration. +/// These extensions enable easy setup of Redis-based messaging, including message subscriptions and publications. +/// +public static class FluentBrighterExtensions +{ + /// + /// Configures Fluent Brighter to use Redis for messaging infrastructure. + /// This method provides a fluent API for setting up all Redis-related features including + /// message queues, subscriptions, publications, and Redis-specific settings. + /// + /// The instance to configure. + /// An action that configures the with Redis-specific settings. + /// The instance for method chaining. + /// Thrown when the Redis configuration is invalid or incomplete (e.g., no Redis connection configured). + public static FluentBrighterBuilder UsingRedis(this FluentBrighterBuilder builder, + Action configure) + { + var configurator = new RedisConfigurator(); + configure(configurator); + configurator.SetFluentBrighter(builder); + return builder; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/Extensions/ProducersExtensions.cs b/src/Fluent.Brighter.Redis/Extensions/ProducersExtensions.cs new file mode 100644 index 0000000..11cd10b --- /dev/null +++ b/src/Fluent.Brighter.Redis/Extensions/ProducersExtensions.cs @@ -0,0 +1,28 @@ +using System; + +using Fluent.Brighter.Redis; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for to configure Redis-based message producers. +/// These extensions enable easy setup of Redis publications for message production. +/// +public static class ProducersExtensions +{ + /// + /// Adds a Redis publication to the producer builder using a configuration action. + /// Publications define how messages are published to Redis, including topic mappings and message metadata. + /// + /// The instance to configure. + /// An action that configures the with publication settings. + /// The instance for method chaining. + public static ProducerBuilder AddRedisPublication( + this ProducerBuilder builder, + Action configure) + { + var factory = new RedisMessageProducerFactoryBuilder(); + configure(factory); + return builder.AddMessageProducerFactory(factory.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/Extensions/RedisMessageProducerFactoryBuilderExtensions.cs b/src/Fluent.Brighter.Redis/Extensions/RedisMessageProducerFactoryBuilderExtensions.cs new file mode 100644 index 0000000..6d2110c --- /dev/null +++ b/src/Fluent.Brighter.Redis/Extensions/RedisMessageProducerFactoryBuilderExtensions.cs @@ -0,0 +1,67 @@ +using System; + +using Fluent.Brighter.Redis; + +using Paramore.Brighter; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for to simplify configuration. +/// These extensions enable fluent configuration of Redis message producer factories, including +/// connection settings and publication mappings. +/// +public static class RedisMessageProducerFactoryBuilderExtensions +{ + /// + /// Sets the Redis messaging gateway configuration using a fluent configuration builder. + /// This extension method creates a , applies the provided configuration, + /// and sets the resulting configuration on the producer factory builder. + /// + /// The instance to configure. + /// An action that configures the with connection details and settings. + /// The instance for method chaining. + public static RedisMessageProducerFactoryBuilder SetConfiguration(this RedisMessageProducerFactoryBuilder builder, + Action configure) + { + var configuration = new RedisMessagingGatewayConfigurationBuilder(); + configure(configuration); + return builder.SetConfiguration(configuration.Build()); + } + + /// + /// Adds a Redis publication to the producer factory using a configuration builder. + /// Publications define how messages are published to Redis, including topic mappings and message metadata. + /// + /// The instance to configure. + /// An action that configures the with publication settings. + /// The instance for method chaining. + public static RedisMessageProducerFactoryBuilder AddPublication( + this RedisMessageProducerFactoryBuilder builder, + Action configure) + { + var publication = new RedisPublicationBuilder(); + configure(publication); + return builder.AddPublication(publication.Build()); + } + + /// + /// Adds a Redis publication for a specific request type to the producer factory. + /// This method automatically configures the publication with the specified type + /// and then applies additional configuration provided by the action. + /// + /// The type of request/message that this publication will handle. Must implement . + /// The instance to configure. + /// An action that configures the with additional publication settings. + /// The instance for method chaining. + public static RedisMessageProducerFactoryBuilder AddPublication( + this RedisMessageProducerFactoryBuilder builder, + Action configure) + where TRequest : class, IRequest + { + var publication = new RedisPublicationBuilder(); + publication.SetRequestType(typeof(TRequest)); + configure(publication); + return builder.AddPublication(publication.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/Extensions/RedisPublicationBuilderExtensions.cs b/src/Fluent.Brighter.Redis/Extensions/RedisPublicationBuilderExtensions.cs new file mode 100644 index 0000000..5224a12 --- /dev/null +++ b/src/Fluent.Brighter.Redis/Extensions/RedisPublicationBuilderExtensions.cs @@ -0,0 +1,49 @@ +using Fluent.Brighter.Redis; + +using Paramore.Brighter; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for to simplify configuration with convenient helper methods. +/// These extensions provide readable, intention-revealing methods for configuring channel creation behavior. +/// +public static class RedisPublicationBuilderExtensions +{ + #region MakeChannels + /// + /// Configures the publication to create Redis topics/channels if they don't exist. + /// When a publication attempts to send to a non-existent topic, it will be automatically created. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisPublicationBuilder CreateTopicIfMissing(this RedisPublicationBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Create); + } + + /// + /// Configures the publication to validate that Redis topics/channels exist before publishing. + /// If a topic doesn't exist, an error will be raised rather than creating it automatically. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisPublicationBuilder ValidIfTopicExists(this RedisPublicationBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Validate); + } + + /// + /// Configures the publication to assume Redis topics/channels exist without validation. + /// No checks will be performed, and the publication will attempt to use the topic regardless. + /// This is the most performant option but requires topics to be pre-created. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisPublicationBuilder AssumeTopicExists(this RedisPublicationBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Assume); + } + + #endregion +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/Extensions/RedisSubscriptionBuilderExtensions.cs b/src/Fluent.Brighter.Redis/Extensions/RedisSubscriptionBuilderExtensions.cs new file mode 100644 index 0000000..7fb2d63 --- /dev/null +++ b/src/Fluent.Brighter.Redis/Extensions/RedisSubscriptionBuilderExtensions.cs @@ -0,0 +1,79 @@ + +using Fluent.Brighter.Redis; + +using Paramore.Brighter; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for to simplify configuration with convenient helper methods. +/// These extensions provide readable, intention-revealing methods for configuring message pump types and channel creation behavior. +/// +public static class RedisSubscriptionBuilderExtensions +{ + #region Message Pump + + /// + /// Configures the subscription to use the Proactor message pump pattern. + /// The Proactor pattern uses async/await for non-blocking I/O operations, making it ideal for + /// high-throughput scenarios and when handling many concurrent operations. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisSubscriptionBuilder UseProactorMode(this RedisSubscriptionBuilder builder) + { + return builder.SetMessagePumpType(MessagePumpType.Proactor); + } + + /// + /// Configures the subscription to use the Reactor message pump pattern. + /// The Reactor pattern uses synchronous processing, which can be simpler to reason about + /// and may be preferred for CPU-bound operations or simpler workflows. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisSubscriptionBuilder UseReactorMode(this RedisSubscriptionBuilder builder) + { + return builder.SetMessagePumpType(MessagePumpType.Reactor); + } + + #endregion + + #region MakeChannels + + /// + /// Configures the subscription to create Redis infrastructure (topics/channels) if it doesn't exist. + /// When a subscription attempts to consume from non-existent infrastructure, it will be automatically created. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisSubscriptionBuilder CreateInfrastructureIfMissing(this RedisSubscriptionBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Create); + } + + /// + /// Configures the subscription to validate that Redis infrastructure (topics/channels) exists before consuming. + /// If infrastructure doesn't exist, an error will be raised rather than creating it automatically. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisSubscriptionBuilder ValidIfInfrastructureExists(this RedisSubscriptionBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Validate); + } + + /// + /// Configures the subscription to assume Redis infrastructure (topics/channels) exists without validation. + /// No checks will be performed, and the subscription will attempt to consume regardless. + /// This is the most performant option but requires infrastructure to be pre-created. + /// + /// The instance to configure. + /// The instance for method chaining. + public static RedisSubscriptionBuilder AssumeInfrastructureExists(this RedisSubscriptionBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Assume); + } + + #endregion +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/Fluent.Brighter.Redis.csproj b/src/Fluent.Brighter.Redis/Fluent.Brighter.Redis.csproj new file mode 100644 index 0000000..0df1bdd --- /dev/null +++ b/src/Fluent.Brighter.Redis/Fluent.Brighter.Redis.csproj @@ -0,0 +1,18 @@ + + + + $(BrighterCoreTargetFrameworks) + enable + Fluent configuration extensions for Paramore.Brighter Redis messaging: provides fluent APIs to configure Redis publications and subscriptions, CloudEvents metadata, and channel setup for event-driven .NET applications. + Brighter;Redis;Messaging;MessageQueue;PubSub;EventDriven;Microservices;DistributedSystems;Fluent;Configuration;Builder;Publisher;Subscriber;Producer;Consumer;CloudEvents + + + + + + + + + + + diff --git a/src/Fluent.Brighter.Redis/RedisConfigurator.cs b/src/Fluent.Brighter.Redis/RedisConfigurator.cs new file mode 100644 index 0000000..fa7e661 --- /dev/null +++ b/src/Fluent.Brighter.Redis/RedisConfigurator.cs @@ -0,0 +1,107 @@ +using System; + +using Fluent.Brighter.Redis.Extensions; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.Redis; + +namespace Fluent.Brighter.Redis; + +/// +/// Provides a fluent API for configuring Redis integration with Fluent Brighter. +/// This configurator allows you to set up Redis-based messaging, including message subscriptions and publications. +/// +public sealed class RedisConfigurator +{ + private RedisMessagingGatewayConfiguration? _connection; + private Action _action = _ => { }; + + /// + /// Sets the Redis messaging gateway connection using a fluent configuration builder. + /// + /// An action that configures the with connection details and settings. + /// The current instance for method chaining. + public RedisConfigurator SetConnection(Action configure) + { + var connection = new RedisMessagingGatewayConfigurationBuilder(); + configure(connection); + _connection = connection.Build(); + return this; + } + + /// + /// Sets the Redis messaging gateway connection using a pre-configured object. + /// + /// The Redis messaging gateway configuration containing connection details. + /// The current instance for method chaining. + public RedisConfigurator SetConnection(RedisMessagingGatewayConfiguration connection) + { + _connection = connection; + return this; + } + + /// + /// Configures Redis-based message publications. + /// This method allows you to define how messages are published through Redis messaging gateway, + /// including topic mappings and producer settings. + /// + /// An action that configures the for publications. + /// The current instance for method chaining. + public RedisConfigurator UsePublications(Action configure) + { + _action += fluent => + { + fluent.Producers(producer => producer + .AddRedisPublication(cfg => + { + cfg.SetConfiguration(_connection!); + configure(cfg); + })); + }; + return this; + } + + /// + /// Configures Redis-based message subscriptions. + /// This method allows you to define how messages are consumed from Redis messaging gateway, + /// including channel configurations and subscription mappings for different message types. + /// + /// An action that configures the for subscriptions. + /// The current instance for method chaining. + public RedisConfigurator UseSubscriptions(Action configure) + { + _action += fluent => + { + fluent.Subscriptions(sub => + { + var channel = new ChannelFactory(new RedisMessageConsumerFactory(_connection!)); + var configurator = new RedisSubscriptionConfigurator(channel); + configure(configurator); + + sub.AddChannelFactory(channel); + + foreach (var subscription in configurator.Subscriptions) + { + sub.AddRedisSubscription(subscription); + } + }); + }; + return this; + } + + /// + /// Applies the configured Redis settings to the Fluent Brighter builder. + /// This method is called internally to set up all the configured Redis features. + /// + /// The Fluent Brighter builder instance to configure. + /// Thrown when no Redis connection has been configured via or . + internal void SetFluentBrighter(FluentBrighterBuilder fluentBrighter) + { + if (_connection == null) + { + throw new ConfigurationException("No MessagingGatewayConnection was set"); + } + + _action(fluentBrighter); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/RedisMessageProducerFactoryBuilder.cs b/src/Fluent.Brighter.Redis/RedisMessageProducerFactoryBuilder.cs new file mode 100644 index 0000000..d94b86b --- /dev/null +++ b/src/Fluent.Brighter.Redis/RedisMessageProducerFactoryBuilder.cs @@ -0,0 +1,70 @@ +using System.Collections.Generic; +using System.Linq; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.Redis; + +namespace Fluent.Brighter.Redis; + +/// +/// Builder class for configuring and creating a Redis message producer factory. +/// This factory is responsible for creating message producers that publish messages to Redis-based messaging gateway. +/// +public sealed class RedisMessageProducerFactoryBuilder +{ + private RedisMessagingGatewayConfiguration? _configuration; + + /// + /// Sets the Redis messaging gateway configuration to be used by the message producers. + /// + /// The Redis messaging gateway configuration containing connection details and settings. + /// The current instance for method chaining. + public RedisMessageProducerFactoryBuilder SetConfiguration(RedisMessagingGatewayConfiguration configuration) + { + _configuration = configuration; + return this; + } + + private List _publications = []; + + /// + /// Sets the collection of Redis message publications that define how messages are published. + /// Publications define the mapping between message types and Redis channels/topics. + /// This method replaces any previously configured publications. + /// + /// An array of configurations to set. + /// The current instance for method chaining. + public RedisMessageProducerFactoryBuilder SetPublications(params RedisMessagePublication[] publications) + { + _publications = publications.ToList(); + return this; + } + + /// + /// Adds a single publication configuration to the existing collection. + /// Publications define the mapping between message types and Redis channels/topics. + /// + /// The configuration to add. + /// The current instance for method chaining. + public RedisMessageProducerFactoryBuilder AddPublication(RedisMessagePublication publications) + { + _publications.Add(publications); + return this; + } + + /// + /// Builds and returns a configured instance. + /// This method is called internally to create the factory with the configured settings. + /// + /// A configured instance. + /// Thrown when no configuration has been set via . + internal RedisMessageProducerFactory Build() + { + if (_configuration == null) + { + throw new ConfigurationException("The configuration was not set"); + } + + return new RedisMessageProducerFactory(_configuration, _publications); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/RedisMessagingGatewayConfigurationBuilder.cs b/src/Fluent.Brighter.Redis/RedisMessagingGatewayConfigurationBuilder.cs new file mode 100644 index 0000000..94eba01 --- /dev/null +++ b/src/Fluent.Brighter.Redis/RedisMessagingGatewayConfigurationBuilder.cs @@ -0,0 +1,237 @@ +using System; + +using Paramore.Brighter.MessagingGateway.Redis; + +namespace Fluent.Brighter.Redis; + +/// +/// Builder class for configuring Redis messaging gateway settings. +/// This builder provides a fluent API for setting connection timeouts, pool sizes, retry behavior, +/// and other Redis-specific configuration options. +/// +public sealed class RedisMessagingGatewayConfigurationBuilder +{ + private int? _defaultConnectTimeout; + + /// + /// Sets the default RedisClient socket connect timeout. + /// + /// The connect timeout in milliseconds, or null for no timeout (default -1, None). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetDefaultConnectTimeout(int? defaultConnectTimeout) + { + _defaultConnectTimeout = defaultConnectTimeout; + return this; + } + + private int? _defaultSendTimeout; + + /// + /// Sets the default RedisClient socket send timeout. + /// + /// The send timeout in milliseconds, or null for no timeout (default -1, None). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetDefaultSendTimeout(int? defaultSendTimeout) + { + _defaultSendTimeout = defaultSendTimeout; + return this; + } + + private int? _defaultReceiveTimeout; + + /// + /// Sets the default RedisClient socket receive timeout. + /// + /// The receive timeout in milliseconds, or null for no timeout (default -1, None). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetDefaultReceiveTimeout(int? defaultReceiveTimeout) + { + _defaultReceiveTimeout = defaultReceiveTimeout; + return this; + } + + private int? _defaultIdleTimeOutSecs; + + /// + /// Sets the default idle timeout before a subscription is considered stale. + /// + /// The idle timeout in seconds, or null to use the default (240 seconds). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetDefaultIdleTimeOutSecs(int? defaultIdleTimeOutSecs) + { + _defaultIdleTimeOutSecs = defaultIdleTimeOutSecs; + return this; + } + + private int? _defaultRetryTimeout; + + /// + /// Sets the default retry timeout for automatic retry of failed operations. + /// + /// The retry timeout in milliseconds, or null to use the default (10,000ms). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetDefaultRetryTimeout(int? defaultRetryTimeout) + { + _defaultRetryTimeout = defaultRetryTimeout; + return this; + } + + private int? _bufferPoolMaxSize; + + /// + /// Sets the byte buffer size for operations to use a byte buffer pool. + /// + /// The buffer pool maximum size in bytes, or null to use the default (500kb). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetBufferPoolMaxSize(int? bufferPoolMaxSize) + { + _bufferPoolMaxSize = bufferPoolMaxSize; + return this; + } + + private bool? _verifyMasterConnections; + + /// + /// Sets whether connections to master hosts should be verified to ensure they're still master instances. + /// + /// True to verify master connections, false to skip verification, or null to use the default (true). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetVerifyMasterConnections(bool? verifyMasterConnections) + { + _verifyMasterConnections = verifyMasterConnections; + return this; + } + + private int? _hostLookupTimeoutMs; + + /// + /// Sets the connect timeout on clients used to find the next available host. + /// + /// The host lookup timeout in milliseconds, or null to use the default (200ms). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetHostLookupTimeoutMs(int? hostLookupTimeoutMs) + { + _hostLookupTimeoutMs = hostLookupTimeoutMs; + return this; + } + + private int? _assumeServerVersion; + + /// + /// Sets the assumed Redis server version to skip server version checks. + /// Specify the minimum version number (e.g., 2.8.12 => 2812, 2.9.1 => 2910). + /// + /// The assumed server version number, or null to perform version checks. + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetAssumeServerVersion(int? assumeServerVersion) + { + _assumeServerVersion = assumeServerVersion; + return this; + } + + private TimeSpan? _deactivatedClientsExpiry; + + /// + /// Sets how long to hold deactivated clients before disposing their subscription. + /// Use to dispose of deactivated clients immediately. + /// + /// The expiry time for deactivated clients, or null to use the default (1 minute). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetDeactivatedClientsExpiry(TimeSpan? deactivatedClientsExpiry) + { + _deactivatedClientsExpiry = deactivatedClientsExpiry; + return this; + } + + private bool? _disableVerboseLogging; + + /// + /// Sets whether debug logging should log detailed Redis operations. + /// + /// True to disable verbose logging, false to enable it, or null to use the default (false). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetDisableVerboseLogging(bool? disableVerboseLogging) + { + _disableVerboseLogging = disableVerboseLogging; + return this; + } + + private int? _backoffMultiplier; + + /// + /// Sets the exponential backoff interval for retrying connections on socket failure. + /// + /// The backoff multiplier in milliseconds, or null to use the default (10ms). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetBackoffMultiplier(int? backoffMultiplier) + { + _backoffMultiplier = backoffMultiplier; + return this; + } + + private int? _maxPoolSize; + + /// + /// Sets the maximum size of the Redis connection pool. + /// + /// The maximum pool size, or null for no limit (default None). + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetMaxPoolSize(int? maxPoolSize) + { + _maxPoolSize = maxPoolSize; + return this; + } + + private TimeSpan? _messageTimeToLive; + + /// + /// Sets how long message bodies persist in Redis before being reclaimed. + /// Once reclaimed, attempts to retrieve the message will fail and the message will be rejected. + /// + /// The time-to-live for messages, or null to use the default. + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetMessageTimeToLive(TimeSpan? messageTimeToLive) + { + _messageTimeToLive = messageTimeToLive; + return this; + } + + private string? _redisConnectionString; + + /// + /// Sets the Redis connection string that defines how to connect to Redis. + /// + /// The Redis connection string, or null if not specified. + /// The current instance for method chaining. + public RedisMessagingGatewayConfigurationBuilder SetRedisConnectionString(string? redisConnectionString) + { + _redisConnectionString = redisConnectionString; + return this; + } + + /// + /// Builds and returns a configured instance. + /// This method is called internally to create the configuration with all the configured settings. + /// + /// A configured instance. + internal RedisMessagingGatewayConfiguration Build() + { + return new RedisMessagingGatewayConfiguration + { + DefaultConnectTimeout = _defaultConnectTimeout, + DefaultSendTimeout = _defaultSendTimeout, + DefaultReceiveTimeout = _defaultReceiveTimeout, + DefaultIdleTimeOutSecs = _defaultIdleTimeOutSecs, + DefaultRetryTimeout = _defaultRetryTimeout, + BufferPoolMaxSize = _bufferPoolMaxSize, + VerifyMasterConnections = _verifyMasterConnections, + HostLookupTimeoutMs = _hostLookupTimeoutMs, + DeactivatedClientsExpiry = _deactivatedClientsExpiry, + DisableVerboseLogging = _disableVerboseLogging, + BackoffMultiplier = _backoffMultiplier, + MaxPoolSize = _maxPoolSize, + MessageTimeToLive = _messageTimeToLive, + RedisConnectionString = _redisConnectionString + }; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/RedisPublicationBuilder.cs b/src/Fluent.Brighter.Redis/RedisPublicationBuilder.cs new file mode 100644 index 0000000..f5cdfbe --- /dev/null +++ b/src/Fluent.Brighter.Redis/RedisPublicationBuilder.cs @@ -0,0 +1,184 @@ +using System; +using System.Collections.Generic; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.Redis; + +namespace Fluent.Brighter.Redis; + +/// +/// Builder class for configuring and creating a Redis publication. +/// A publication defines how messages are published to Redis, including topic mappings, +/// message metadata, CloudEvents properties, and Redis-specific settings. +/// +public sealed class RedisPublicationBuilder +{ + private Uri? _dataSchema; + + /// + /// Sets the data schema URI for CloudEvents messages. + /// This defines the schema that describes the structure of the message data. + /// + /// The URI pointing to the data schema definition, or null if no schema is defined. + /// The current instance for method chaining. + public RedisPublicationBuilder SetDataSchema(Uri? dataSchema) + { + _dataSchema = dataSchema; + return this; + } + + private OnMissingChannel _makeChannels = OnMissingChannel.Create; + + /// + /// Sets the behavior for handling missing channels/topics in Redis. + /// Determines whether to create, validate, or assume the existence of channels. + /// + /// The behavior to apply. Default is . + /// The current instance for method chaining. + public RedisPublicationBuilder SetMakeChannels(OnMissingChannel makeChannels) + { + _makeChannels = makeChannels; + return this; + } + + private Type? _requestType; + + /// + /// Sets the request type for the publication. + /// This defines the .NET type of the message/command/event that will be published. + /// + /// The of the request, or null if not specified. + /// The current instance for method chaining. + public RedisPublicationBuilder SetRequestType(Type? requestType) + { + _requestType = requestType; + return this; + } + + private Uri _source = new(MessageHeader.DefaultSource); + + /// + /// Sets the source URI for CloudEvents messages. + /// This identifies the context in which an event happened and is part of the CloudEvents specification. + /// + /// The URI identifying the source of the event. Default is . + /// The current instance for method chaining. + public RedisPublicationBuilder SetSource(Uri source) + { + _source = source; + return this; + } + + private string? _subject; + + /// + /// Sets the subject for CloudEvents messages. + /// This describes the subject of the event in the context of the event producer and is part of the CloudEvents specification. + /// + /// The subject of the event, or null if not specified. + /// The current instance for method chaining. + public RedisPublicationBuilder SetSubject(string? subject) + { + _subject = subject; + return this; + } + + private RoutingKey? _topic; + + /// + /// Sets the routing key (topic/channel name) where messages will be published in Redis. + /// This determines the destination topic for outgoing messages. + /// + /// The routing key identifying the Redis topic/channel, or null if not specified. + /// The current instance for method chaining. + public RedisPublicationBuilder SetTopic(RoutingKey? topic) + { + _topic = topic; + return this; + } + + private CloudEventsType _type = CloudEventsType.Empty; + + /// + /// Sets the CloudEvents type for messages. + /// This describes the type of event related to the originating occurrence and is part of the CloudEvents specification. + /// + /// The to apply. Default is . + /// The current instance for method chaining. + public RedisPublicationBuilder SetType(CloudEventsType type) + { + _type = type; + return this; + } + + private IDictionary? _defaultHeaders; + + /// + /// Sets default headers to be included with every published message. + /// These headers provide additional metadata that will be sent along with messages. + /// + /// A dictionary of default header key-value pairs, or null if no default headers are needed. + /// The current instance for method chaining. + public RedisPublicationBuilder SetDefaultHeaders(IDictionary? defaultHeaders) + { + _defaultHeaders = defaultHeaders; + return this; + } + + private IDictionary? _cloudEventsAdditionalProperties; + + /// + /// Sets additional CloudEvents extension properties to be included with published messages. + /// CloudEvents allows custom extension attributes beyond the standard specification to provide additional context. + /// + /// A dictionary of additional CloudEvents properties, or null if no additional properties are needed. + /// The current instance for method chaining. + /// + /// These properties are serialized alongside the core CloudEvents attributes when mapping to a CloudEvent message. + /// If any key conflicts with standard CloudEvents JSON properties (e.g., "id", "source", "type"), + /// the serializer will prioritize the value in this dictionary, potentially overriding standard properties. + /// Exercise caution to avoid unintended overwrites of core CloudEvents attributes. + /// + public RedisPublicationBuilder SetCloudEventsAdditionalProperties(IDictionary? cloudEventsAdditionalProperties) + { + _cloudEventsAdditionalProperties = cloudEventsAdditionalProperties; + return this; + } + + private string? _replyTo; + + /// + /// Sets the reply-to address for request-reply messaging patterns. + /// This specifies where response messages should be sent when implementing request-reply communication. + /// + /// The reply-to address/topic name, or null if no reply is expected. + /// The current instance for method chaining. + public RedisPublicationBuilder SetReplyTo(string? replyTo) + { + _replyTo = replyTo; + return this; + } + + /// + /// Builds and returns a configured instance. + /// This method is called internally to create the publication with all the configured settings + /// including CloudEvents properties, routing information, and Redis-specific options. + /// + /// A configured instance. + internal RedisMessagePublication Build() + { + return new RedisMessagePublication + { + DataSchema = _dataSchema, + MakeChannels = _makeChannels, + RequestType = _requestType, + Source = _source, + Subject = _subject, + Topic = _topic, + Type = _type, + DefaultHeaders = _defaultHeaders, + CloudEventsAdditionalProperties = _cloudEventsAdditionalProperties, + ReplyTo = _replyTo + }; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/RedisSubscriptionBuilder.cs b/src/Fluent.Brighter.Redis/RedisSubscriptionBuilder.cs new file mode 100644 index 0000000..dec1695 --- /dev/null +++ b/src/Fluent.Brighter.Redis/RedisSubscriptionBuilder.cs @@ -0,0 +1,328 @@ +using System; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.Redis; + +namespace Fluent.Brighter.Redis; + +/// +/// Builder class for configuring and creating a Redis subscription. +/// A subscription defines how messages are consumed from Redis, including channel settings, +/// message processing options, retry behavior, and Redis-specific configurations. +/// +public sealed class RedisSubscriptionBuilder +{ + private SubscriptionName _subscriptionName = new(Uuid.New().ToString("N")); + + /// + /// Sets the subscription name that uniquely identifies this subscription. + /// The subscription name is used to track and manage the subscription within the Brighter framework. + /// + /// The unique name for this subscription. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetSubscription(SubscriptionName subscriptionName) + { + _subscriptionName = subscriptionName; + return this; + } + + private ChannelName? _channelName; + private RoutingKey? _routingKey; + + /// + /// Sets the channel name and routing key for the subscription. + /// The channel represents the Redis queue/topic from which messages will be consumed. + /// This method automatically sets the routing key to match the channel name. + /// + /// The name of the channel to subscribe to. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetQueue(ChannelName channelName) + { + _channelName = channelName; + return this; + } + + /// + /// Sets the routing key (topic) for the subscription. + /// This determines which messages will be routed to this subscription based on the topic pattern. + /// + /// The routing key identifying the Redis topic pattern. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetTopic(RoutingKey routingKey) + { + _routingKey = routingKey; + return this; + } + + private Type? _dataType; + + /// + /// Sets the data type for messages in this subscription. + /// When provided, this method will automatically configure the subscription name, channel name, + /// and routing key based on the type name if they haven't been explicitly set. + /// + /// The .NET type of the messages that will be consumed, or null if not specified. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetDataType(Type? dataType) + { + _dataType = dataType; + + if (dataType != null) + { + var typeName = dataType.FullName!; + _subscriptionName ??= new SubscriptionName(typeName); + _channelName ??= new ChannelName(typeName); + _routingKey ??= new RoutingKey(typeName); + } + + return this; + } + + private Func? _getRequestType; + + /// + /// Sets a function that determines the request type dynamically based on the message content. + /// This allows for polymorphic message handling where different message types can be processed by different handlers. + /// + /// A function that takes a and returns the corresponding .NET type, or null to use the default behavior. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetGetRequestType(Func? getRequestType) + { + _getRequestType = getRequestType; + return this; + } + + private int _bufferSize = 1; + + /// + /// Sets the buffer size for the message channel. + /// The buffer size determines how many messages can be queued in memory before processing, + /// allowing for better throughput in high-volume scenarios. + /// + /// The buffer size (number of messages). Must be between 1 and 10. Default is 1. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetBufferSize(int bufferSize) + { + _bufferSize = bufferSize; + return this; + } + + private int _noOfPerformers = 1; + + /// + /// Sets the number of performer threads that will process messages concurrently. + /// Multiple performers allow parallel message processing, improving throughput for I/O-bound operations. + /// + /// The number of concurrent performers. Default is 1. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetNumberOfPerformers(int noOfPerformers) + { + _noOfPerformers = noOfPerformers; + return this; + } + + private TimeSpan? _timeOut; + + /// + /// Sets the timeout duration for message processing operations. + /// If a message handler takes longer than this timeout, the operation will be cancelled. + /// + /// The timeout duration, or null to use the default timeout (300ms). + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetTimeout(TimeSpan? timeout) + { + _timeOut = timeout; + return this; + } + + private int _requeueCount = -1; + + /// + /// Sets the maximum number of times a failed message will be requeued for retry. + /// Messages that fail processing can be retried up to this limit before being moved to a dead letter queue or discarded. + /// + /// The maximum requeue count. Use -1 for unlimited retries. Default is -1. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetRequeueCount(int requeueCount) + { + _requeueCount = requeueCount; + return this; + } + + private TimeSpan? _requeueDelay; + + /// + /// Sets the delay before a failed message is requeued for retry. + /// This provides a backoff period before attempting to process the message again, which can help with transient failures. + /// + /// The requeue delay duration, or null to use the default delay (zero). + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetRequeueDelay(TimeSpan? requeueDelay) + { + _requeueDelay = requeueDelay; + return this; + } + + private int _unacceptableMessageLimit; + + /// + /// Sets the limit for the number of unacceptable messages that can be received before the channel stops processing. + /// Unacceptable messages are those that cannot be deserialized or are malformed. This limit prevents endless processing of bad messages. + /// + /// The maximum number of unacceptable messages before stopping. Use 0 for unlimited. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetUnacceptableMessageLimit(int unacceptableMessageLimit) + { + _unacceptableMessageLimit = unacceptableMessageLimit; + return this; + } + + private MessagePumpType _messagePumpType = MessagePumpType.Proactor; + + /// + /// Sets the message pump type that determines how messages are processed. + /// Proactor uses async/await patterns for non-blocking I/O, while Reactor uses synchronous processing. + /// + /// The to use. Default is . + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetMessagePumpType(MessagePumpType messagePumpType) + { + _messagePumpType = messagePumpType; + return this; + } + + private IAmAChannelFactory? _channelFactory; + + /// + /// Sets a custom channel factory for creating message channels. + /// Use this when you need to provide custom channel creation logic beyond the default Redis channel factory. + /// + /// The custom channel factory implementation, or null to use the default. + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetChannelFactory(IAmAChannelFactory? channelFactory) + { + _channelFactory = channelFactory; + return this; + } + + private OnMissingChannel _onMissingChannel = OnMissingChannel.Create; + + /// + /// Sets the behavior for handling missing channels/topics in Redis. + /// Determines whether to create, validate, or assume the existence of channels when they are not found. + /// + /// The behavior to apply. Default is . + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetMakeChannels(OnMissingChannel onMissingChannel) + { + _onMissingChannel = onMissingChannel; + return this; + } + + private TimeSpan? _emptyChannelDelay; + + /// + /// Sets the delay before checking for messages again when the channel is empty. + /// This prevents tight polling loops and reduces load on Redis when no messages are available. + /// + /// The delay duration when the channel is empty, or null to use the default delay (500ms). + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetEmptyChannelDelay(TimeSpan? emptyChannelDelay) + { + _emptyChannelDelay = emptyChannelDelay; + return this; + } + + private TimeSpan? _channelFailureDelay; + + /// + /// Sets the delay before retrying channel operations after a failure. + /// This provides a backoff period when channel errors occur, preventing rapid retry loops that could overwhelm the system. + /// + /// The delay duration after a channel failure, or null to use the default delay (1000ms). + /// The current instance for method chaining. + public RedisSubscriptionBuilder SetChannelFailureDelay(TimeSpan? channelFailureDelay) + { + _channelFailureDelay = channelFailureDelay; + return this; + } + + /// + /// Builds and returns a configured instance. + /// This method is called internally to create the subscription with all the configured settings + /// including channel configuration, message processing options, and Redis-specific settings. + /// + /// A configured instance. + /// Thrown when required configuration is missing: subscription name, channel name, or routing key. + internal RedisSubscription Build() + { + if (_subscriptionName == null) + { + throw new ConfigurationException("Subscription name is required"); + } + + if (_channelName == null) + { + throw new ConfigurationException("Channel name is required"); + } + + if (_routingKey == null) + { + throw new ConfigurationException("Routing key is required"); + } + + return new TmpRedisSubscription( + subscriptionName: _subscriptionName, + channelName: _channelName, + routingKey: _routingKey, + requestType: _dataType, + getRequestType: _getRequestType, + bufferSize: _bufferSize, + noOfPerformers: _noOfPerformers, + timeOut: _timeOut, + requeueCount: _requeueCount, + requeueDelay: _requeueDelay, + unacceptableMessageLimit: _unacceptableMessageLimit, + messagePumpType: _messagePumpType, + channelFactory: _channelFactory, + makeChannels: _onMissingChannel, + emptyChannelDelay: _emptyChannelDelay, + channelFailureDelay: _channelFailureDelay + ); + } + + private class TmpRedisSubscription( + SubscriptionName subscriptionName, + ChannelName channelName, + RoutingKey routingKey, + Type? requestType = null, + Func? getRequestType = null, + int bufferSize = 1, + int noOfPerformers = 1, + TimeSpan? timeOut = null, + int requeueCount = -1, + TimeSpan? requeueDelay = null, + int unacceptableMessageLimit = 0, + MessagePumpType messagePumpType = MessagePumpType.Proactor, + IAmAChannelFactory? channelFactory = null, + OnMissingChannel makeChannels = OnMissingChannel.Create, + TimeSpan? emptyChannelDelay = null, + TimeSpan? channelFailureDelay = null) : RedisSubscription( + subscriptionName, + channelName, + routingKey, + requestType, + getRequestType, + bufferSize, + noOfPerformers, + timeOut, + requeueCount, + requeueDelay, + unacceptableMessageLimit, + messagePumpType, + channelFactory, + makeChannels, + emptyChannelDelay, + channelFailureDelay + ); +} \ No newline at end of file diff --git a/src/Fluent.Brighter.Redis/RedisSubscriptionConfigurator.cs b/src/Fluent.Brighter.Redis/RedisSubscriptionConfigurator.cs new file mode 100644 index 0000000..c4d5f20 --- /dev/null +++ b/src/Fluent.Brighter.Redis/RedisSubscriptionConfigurator.cs @@ -0,0 +1,66 @@ +using System; +using System.Collections.Generic; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.Redis; + +namespace Fluent.Brighter.Redis; + +/// +/// Configurator class for managing Redis message subscriptions. +/// This class provides a fluent API for adding and configuring multiple subscriptions +/// that consume messages from Redis-based messaging channels. +/// +/// The Redis channel factory used to create message channels for subscriptions. +public sealed class RedisSubscriptionConfigurator(ChannelFactory channelFactory) +{ + /// + /// Gets the list of configured Redis subscriptions. + /// This property is used internally to collect all subscriptions that have been added to the configurator. + /// + internal List Subscriptions { get; } = []; + + /// + /// Adds a pre-configured Redis subscription to the configurator. + /// + /// The instance to add. + /// The current instance for method chaining. + public RedisSubscriptionConfigurator AddSubscription(RedisSubscription subscription) + { + Subscriptions.Add(subscription); + return this; + } + + /// + /// Adds a Redis subscription using a builder configuration action. + /// This method creates a new , applies the provided configuration, + /// and automatically sets the channel factory before building the subscription. + /// + /// An action that configures the to define subscription settings. + /// The current instance for method chaining. + public RedisSubscriptionConfigurator AddSubscription(Action configure) + { + var sub = new RedisSubscriptionBuilder(); + sub.SetChannelFactory(channelFactory); + configure(sub); + return AddSubscription(sub.Build()); + } + + /// + /// Adds a Redis subscription for a specific request type using a builder configuration action. + /// This method automatically configures the subscription with the specified type + /// and then applies any additional configuration provided. + /// + /// The type of request/message that this subscription will handle. Must implement . + /// An action that configures the with additional settings. + /// The current instance for method chaining. + public RedisSubscriptionConfigurator AddSubscription(Action configure) + where TRequest : class, IRequest + { + return AddSubscription(cfg => + { + cfg.SetDataType(typeof(TRequest)); + configure(cfg); + }); + } +} \ No newline at end of file