Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<PackageVersion Include="Paramore.Brighter.MessagingGateway.Kafka" Version="$(BrighterVersion)" />
<PackageVersion Include="Paramore.Brighter.MessagingGateway.MsSql" Version="$(BrighterVersion)" />
<PackageVersion Include="Paramore.Brighter.MessagingGateway.Postgres" Version="$(BrighterVersion)" />
<PackageVersion Include="Paramore.Brighter.MessagingGateway.Redis" Version="10.0.2" />
<PackageVersion Include="Paramore.Brighter.MessagingGateway.RMQ.Sync" Version="$(BrighterVersion)" />
<PackageVersion Include="Paramore.Brighter.MessagingGateway.RMQ.Async" Version="$(BrighterVersion)" />
<PackageVersion Include="Paramore.Brighter.MessagingGateway.RocketMQ" Version="$(BrighterVersion)" />
Expand All @@ -53,4 +54,4 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
</ItemGroup>
</Project>
</Project>
30 changes: 30 additions & 0 deletions Fluent.Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.SqlServer",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlServerSample", "samples\SqlServerSample\SqlServerSample.csproj", "{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.Redis", "src\Fluent.Brighter.Redis\Fluent.Brighter.Redis.csproj", "{8DC14D04-F8D0-41CB-8964-B75257CEFE68}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisSample", "samples\RedisSample\RedisSample.csproj", "{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -285,6 +289,30 @@ Global
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x64.Build.0 = Release|Any CPU
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x86.ActiveCfg = Release|Any CPU
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C}.Release|x86.Build.0 = Release|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x64.ActiveCfg = Debug|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x64.Build.0 = Debug|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x86.ActiveCfg = Debug|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Debug|x86.Build.0 = Debug|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|Any CPU.Build.0 = Release|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x64.ActiveCfg = Release|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x64.Build.0 = Release|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x86.ActiveCfg = Release|Any CPU
{8DC14D04-F8D0-41CB-8964-B75257CEFE68}.Release|x86.Build.0 = Release|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x64.ActiveCfg = Debug|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x64.Build.0 = Debug|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x86.ActiveCfg = Debug|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Debug|x86.Build.0 = Debug|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|Any CPU.Build.0 = Release|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x64.ActiveCfg = Release|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x64.Build.0 = Release|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.ActiveCfg = Release|Any CPU
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -309,5 +337,7 @@ Global
{523946F8-241E-4DF6-92A3-875B1DDCDF39} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA}
{1BBDF3C1-F4FF-404E-A5A9-6E3A743280BF} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
{7DB6D8C1-5EDE-4EF1-953B-515C26164F9C} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA}
{8DC14D04-F8D0-41CB-8964-B75257CEFE68} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA}
EndGlobalSection
EndGlobal
12 changes: 12 additions & 0 deletions samples/RedisSample/Commands/FarewellEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Paramore.Brighter;

namespace RedisSample.Commands;

public class FarewellEvent(string farewell) : Event(Id.Random())
{
public FarewellEvent() : this(string.Empty)
{
}

public string Farewell { get; set; } = farewell;
}
10 changes: 10 additions & 0 deletions samples/RedisSample/Commands/GreetingEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Paramore.Brighter;

namespace RedisSample.Commands;

public class GreetingEvent(string greeting) : Event(Id.Random())
{
public GreetingEvent() : this(string.Empty) { }

public string Greeting { get; set; } = greeting;
}
14 changes: 14 additions & 0 deletions samples/RedisSample/Handlers/FarewellEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Paramore.Brighter;

using RedisSample.Commands;

namespace RedisSample.Handlers;

public class FarewellEventHandler : RequestHandler<FarewellEvent>
{
public override FarewellEvent Handle(FarewellEvent @event)
{
Console.WriteLine(">>>>>> {0}", @event.Farewell);
return base.Handle(@event);
}
}
14 changes: 14 additions & 0 deletions samples/RedisSample/Handlers/GreetingEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Paramore.Brighter;

using RedisSample.Commands;

namespace RedisSample.Handlers;

public class GreetingEventHandler : RequestHandler<GreetingEvent>
{
public override GreetingEvent Handle(GreetingEvent @event)
{
Console.WriteLine("====== {0}", @event.Greeting);
return base.Handle(@event);
}
}
80 changes: 80 additions & 0 deletions samples/RedisSample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// See https://aka.ms/new-console-template for more information

using Fluent.Brighter;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

using Paramore.Brighter;
using Paramore.Brighter.ServiceActivator.Extensions.Hosting;

using RedisSample.Commands;

using Serilog;

Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.Enrich.FromLogContext()
.WriteTo.Console()
.CreateLogger();


var host = new HostBuilder()
.UseSerilog()
.ConfigureServices((_, services) =>
{
services
.AddHostedService<ServiceActivatorHostedService>()
.AddFluentBrighter(builder =>
{
builder
.UsingRedis(cfg =>
{
cfg.SetConnection(c => c
.SetRedisConnectionString("redis://localhost:6379?ConnectTimeout=1000&SendTimeout=1000")
.SetMaxPoolSize(10)
.SetMessageTimeToLive(TimeSpan.FromMinutes(10))
.SetDefaultRetryTimeout(3_000));

cfg
.UsePublications(pp => pp
.AddPublication<GreetingEvent>(p => p.SetTopic("greeting.topic"))
.AddPublication<FarewellEvent>(p => p.SetTopic("farewell.topic")))
.UseSubscriptions(sb => sb
.AddSubscription<GreetingEvent>(s => s
.SetTopic("greeting.topic")
.SetQueue("greeting.queue")
.SetMessagePumpType(MessagePumpType.Reactor))
.AddSubscription<FarewellEvent>(s => s
.SetTopic("farewell.topic")
.SetQueue("farewell.queue")
.SetMessagePumpType(MessagePumpType.Reactor)));
});
});
})
.Build();

await host.StartAsync();

while (true)
{
await Task.Delay(TimeSpan.FromSeconds(10));
Console.Write("Say your name (or q to quit): ");
var name = Console.ReadLine();

if (string.IsNullOrEmpty(name))
{
continue;
}

if (name == "q")
{
break;
}

var process = host.Services.GetRequiredService<IAmACommandProcessor>();
await process.PostAsync(new GreetingEvent(name));
await process.PostAsync(new FarewellEvent(name));
}

await host.StopAsync();
19 changes: 19 additions & 0 deletions samples/RedisSample/RedisSample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Fluent.Brighter.Redis\Fluent.Brighter.Redis.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Paramore.Brighter.ServiceActivator.Extensions.Hosting" />
<PackageReference Include="Serilog.AspNetCore" />
</ItemGroup>
</Project>
6 changes: 6 additions & 0 deletions samples/RedisSample/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
services:
redis:
image: redis
ports:
- "6379:6379"

86 changes: 86 additions & 0 deletions src/Fluent.Brighter.Redis/Extensions/ConsumerBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;

using Paramore.Brighter;
using Paramore.Brighter.MessagingGateway.Redis;

namespace Fluent.Brighter.Redis.Extensions;

/// <summary>
/// Provides extension methods for <see cref="ConsumerBuilder"/> to configure Redis-based message consumers.
/// These extensions enable easy setup of Redis subscriptions and channel factories.
/// </summary>
public static class ConsumerBuilderExtensions
{
/// <summary>
/// Adds a pre-configured Redis subscription to the consumer builder.
/// </summary>
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
/// <param name="subscription">The pre-configured <see cref="RedisSubscription"/> to add.</param>
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
public static ConsumerBuilder AddRedisSubscription(this ConsumerBuilder builder, RedisSubscription subscription)
{
return builder.AddSubscription(subscription);
}

/// <summary>
/// Adds a Redis subscription to the consumer builder using a configuration action.
/// This method creates a new <see cref="RedisSubscriptionBuilder"/>, applies the configuration, and builds the subscription.
/// </summary>
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
/// <param name="configure">An action that configures the <see cref="RedisSubscriptionBuilder"/> with subscription settings.</param>
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
public static ConsumerBuilder AddRedisSubscription(this ConsumerBuilder builder,
Action<RedisSubscriptionBuilder> configure)
{
var sub = new RedisSubscriptionBuilder();
configure(sub);
return builder.AddSubscription(sub.Build());
}

/// <summary>
/// Adds a Redis subscription for a specific request type to the consumer builder.
/// This method automatically configures the subscription with the specified <typeparamref name="TRequest"/> type
/// and then applies additional configuration provided by the action.
/// </summary>
/// <typeparam name="TRequest">The type of request/message that this subscription will handle. Must implement <see cref="IRequest"/>.</typeparam>
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
/// <param name="configure">An action that configures the <see cref="RedisSubscriptionBuilder"/> with additional settings.</param>
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
public static ConsumerBuilder AddRedisSubscription<TRequest>(this ConsumerBuilder builder,
Action<RedisSubscriptionBuilder> configure)
where TRequest : class, IRequest
{
var sub = new RedisSubscriptionBuilder();
sub.SetDataType(typeof(TRequest));
configure(sub);
return builder.AddSubscription(sub.Build());
}

/// <summary>
/// Adds a Redis channel factory to the consumer builder using a configuration builder.
/// Channel factories are responsible for creating message channels that consume messages from Redis.
/// </summary>
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
/// <param name="configure">An action that configures the <see cref="RedisMessagingGatewayConfigurationBuilder"/> with connection details.</param>
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
public static ConsumerBuilder AdRedisChannelFactory(this ConsumerBuilder builder,
Action<RedisMessagingGatewayConfigurationBuilder> configure)
{
var connection = new RedisMessagingGatewayConfigurationBuilder();
configure(connection);
return builder.AddRedisChannelFactory(connection.Build());
}

/// <summary>
/// Adds a Redis channel factory to the consumer builder using a pre-configured Redis messaging gateway configuration.
/// Channel factories are responsible for creating message channels that consume messages from Redis.
/// </summary>
/// <param name="builder">The <see cref="ConsumerBuilder"/> instance to configure.</param>
/// <param name="connection">The pre-configured Redis messaging gateway configuration containing connection details.</param>
/// <returns>The <see cref="ConsumerBuilder"/> instance for method chaining.</returns>
public static ConsumerBuilder AddRedisChannelFactory(this ConsumerBuilder builder, RedisMessagingGatewayConfiguration connection)
{
return builder
.AddChannelFactory(new ChannelFactory(new RedisMessageConsumerFactory(connection)));
}
}
30 changes: 30 additions & 0 deletions src/Fluent.Brighter.Redis/Extensions/FluentBrighterExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;

using Fluent.Brighter.Redis;

namespace Fluent.Brighter;

/// <summary>
/// Provides extension methods for <see cref="FluentBrighterBuilder"/> to configure Redis integration.
/// These extensions enable easy setup of Redis-based messaging, including message subscriptions and publications.
/// </summary>
public static class FluentBrighterExtensions
{
/// <summary>
/// Configures Fluent Brighter to use Redis for messaging infrastructure.
/// This method provides a fluent API for setting up all Redis-related features including
/// message queues, subscriptions, publications, and Redis-specific settings.
/// </summary>
/// <param name="builder">The <see cref="FluentBrighterBuilder"/> instance to configure.</param>
/// <param name="configure">An action that configures the <see cref="RedisConfigurator"/> with Redis-specific settings.</param>
/// <returns>The <see cref="FluentBrighterBuilder"/> instance for method chaining.</returns>
/// <exception cref="ConfigurationException">Thrown when the Redis configuration is invalid or incomplete (e.g., no Redis connection configured).</exception>
public static FluentBrighterBuilder UsingRedis(this FluentBrighterBuilder builder,
Action<RedisConfigurator> configure)
{
var configurator = new RedisConfigurator();
configure(configurator);
configurator.SetFluentBrighter(builder);
return builder;
}
}
28 changes: 28 additions & 0 deletions src/Fluent.Brighter.Redis/Extensions/ProducersExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;

using Fluent.Brighter.Redis;

namespace Fluent.Brighter;

/// <summary>
/// Provides extension methods for <see cref="ProducerBuilder"/> to configure Redis-based message producers.
/// These extensions enable easy setup of Redis publications for message production.
/// </summary>
public static class ProducersExtensions
{
/// <summary>
/// Adds a Redis publication to the producer builder using a configuration action.
/// Publications define how messages are published to Redis, including topic mappings and message metadata.
/// </summary>
/// <param name="builder">The <see cref="ProducerBuilder"/> instance to configure.</param>
/// <param name="configure">An action that configures the <see cref="RedisMessageProducerFactoryBuilder"/> with publication settings.</param>
/// <returns>The <see cref="ProducerBuilder"/> instance for method chaining.</returns>
public static ProducerBuilder AddRedisPublication(
this ProducerBuilder builder,
Action<RedisMessageProducerFactoryBuilder> configure)
{
var factory = new RedisMessageProducerFactoryBuilder();
configure(factory);
return builder.AddMessageProducerFactory(factory.Build());
}
}
Loading
Loading