diff --git a/Directory.Packages.props b/Directory.Packages.props index 27b4f0a..fd1ca36 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -12,13 +12,16 @@ + + + @@ -27,25 +30,29 @@ + - + + + + diff --git a/Fluent.Brighter.sln b/Fluent.Brighter.sln index 532301b..e0e86e6 100644 --- a/Fluent.Brighter.sln +++ b/Fluent.Brighter.sln @@ -51,6 +51,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.Redis", "sr EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisSample", "samples\RedisSample\RedisSample.csproj", "{5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluent.Brighter.GoogleCloud", "src\Fluent.Brighter.GoogleCloud\Fluent.Brighter.GoogleCloud.csproj", "{98813670-F995-41CF-85AB-E12B27AEA35F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GcpSample", "samples\GcpSample\GcpSample.csproj", "{FF0AE58C-CFD7-4004-808A-BEF7B24D4997}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -313,6 +317,30 @@ Global {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x64.Build.0 = Release|Any CPU {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.ActiveCfg = Release|Any CPU {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F}.Release|x86.Build.0 = Release|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Debug|x64.ActiveCfg = Debug|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Debug|x64.Build.0 = Debug|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Debug|x86.ActiveCfg = Debug|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Debug|x86.Build.0 = Debug|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Release|Any CPU.Build.0 = Release|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Release|x64.ActiveCfg = Release|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Release|x64.Build.0 = Release|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Release|x86.ActiveCfg = Release|Any CPU + {98813670-F995-41CF-85AB-E12B27AEA35F}.Release|x86.Build.0 = Release|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Debug|x64.ActiveCfg = Debug|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Debug|x64.Build.0 = Debug|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Debug|x86.ActiveCfg = Debug|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Debug|x86.Build.0 = Debug|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Release|Any CPU.Build.0 = Release|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Release|x64.ActiveCfg = Release|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Release|x64.Build.0 = Release|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Release|x86.ActiveCfg = Release|Any CPU + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -339,5 +367,7 @@ Global {7DB6D8C1-5EDE-4EF1-953B-515C26164F9C} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA} {8DC14D04-F8D0-41CB-8964-B75257CEFE68} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} {5CA1F284-5C44-44DF-B5EB-3008A4CBD52F} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA} + {98813670-F995-41CF-85AB-E12B27AEA35F} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {FF0AE58C-CFD7-4004-808A-BEF7B24D4997} = {5D20AA90-6969-D8BD-9DCD-8634F4692FDA} EndGlobalSection EndGlobal diff --git a/README.md b/README.md index 09498ad..e18ebb1 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,57 @@ -# fluent-brighter -A Fluent API to configure Brighter +# Fluent Brighter +[![License](https://img.shields.io/badge/license-GPL-blue.svg)](LICENSE) +[![.NET](https://img.shields.io/badge/.NET-8.0%20%7C%209.0-512BD4)](https://dotnet.microsoft.com/) -# sample +A fluent, type-safe configuration API for [Paramore.Brighter](https://github.com/BrighterCommand/Brighter) - the command processor and message-oriented middleware framework for .NET. -```c# -services +## 📋 Overview + +Fluent Brighter provides an intuitive, strongly-typed builder pattern for configuring Paramore.Brighter's messaging infrastructure, eliminating boilerplate code and making complex configurations easier to read and maintain. + +### ✨ Key Features + +- **🎯 Type-Safe Configuration** - IntelliSense-driven API with compile-time safety +- **🔌 Multiple Message Brokers** - RabbitMQ, Kafka, AWS SNS/SQS, GCP Pub/Sub, Redis, RocketMQ +- **💾 Transactional Outbox Pattern** - DynamoDB, Firestore, Spanner, MongoDB, SQL Server, PostgreSQL, MySQL, SQLite +- **📥 Idempotent Inbox Pattern** - Prevent duplicate message processing +- **🔒 Distributed Locking** - Coordinate across multiple instances +- **📦 Large Message Storage** - S3, GCS for oversized payloads +- **⏰ Message Scheduling** - AWS EventBridge, Hangfire, Quartz integration +- **🎨 Fluent Syntax** - Readable, maintainable configuration code + +## 📦 Installation + +Install the base package and your chosen messaging provider(s): + +```bash +# Core package (required) +dotnet add package Fluent.Brighter + +# Choose your provider(s) +dotnet add package Fluent.Brighter.RMQ.Async # RabbitMQ +dotnet add package Fluent.Brighter.Kafka # Apache Kafka +dotnet add package Fluent.Brighter.AWS.V4 # AWS SNS/SQS +dotnet add package Fluent.Brighter.GoogleCloud # GCP Pub/Sub +dotnet add package Fluent.Brighter.Redis # Redis Streams +dotnet add package Fluent.Brighter.RocketMQ # Apache RocketMQ +dotnet add package Fluent.Brighter.Postgres # PostgreSQL +dotnet add package Fluent.Brighter.SqlServer # SQL Server +dotnet add package Fluent.Brighter.MySql # MySQL +dotnet add package Fluent.Brighter.Sqlite # SQLite +dotnet add package Fluent.Brighter.MongoDb # MongoDB +``` + +## 🚀 Quick Start + +### RabbitMQ Example + +```csharp +using Fluent.Brighter; +using Microsoft.Extensions.DependencyInjection; +using Paramore.Brighter.ServiceActivator.Extensions.Hosting; + +services .AddHostedService() .AddFluentBrighter(brighter => brighter .UsingRabbitMq(rabbitmq => rabbitmq @@ -15,21 +61,181 @@ services .UsePublications(pb => pb .AddPublication(p => p .SetTopic("greeting.event.topic") - .CreateTopicIfMissing()) + .CreateTopicIfMissing()) .AddPublication(p => p .SetTopic("farewell.event.topic") .CreateTopicIfMissing())) .UseSubscriptions(sb => sb .AddSubscription(s => s - .SetSubscription("paramore.example.greeting") + .SetSubscriptionName("paramore.example.greeting") .SetQueue("greeting.event.queue") .SetTopic("greeting.event.topic") .SetTimeout(TimeSpan.FromSeconds(200)) .EnableDurable() .EnableHighAvailability()) .AddSubscription(s => s - .SetSubscription("paramore.example.farewell") + .SetSubscriptionName("paramore.example.farewell") .SetQueue("farewell.event.queue") .SetTopic("farewell.event.topic"))) )); -``` \ No newline at end of file +``` + +## 📚 More Examples + +### AWS SNS/SQS with DynamoDB Outbox + +```csharp +services.AddFluentBrighter(brighter => brighter + .UsingAws(aws => aws + .SetConnection(conn => conn + .SetCredentials(new BasicAWSCredentials("key", "secret")) + .SetRegion(RegionEndpoint.USEast1)) + .UseSnsPublication(pb => pb + .AddPublication(p => p + .SetTopic("orders-created") + .CreateTopicIfMissing())) + .UseSqsSubscription(sb => sb + .AddSubscription(s => s + .SetSubscriptionName("order-processor") + .SetQueue("order-processing-queue") + .SetTopic("orders-created") + .SetMessagePumpType(MessagePumpType.Proactor) + .SetMakeChannels(OnMissingChannel.Create))) + .UseDynamoDbOutbox("outbox-table") + .UseDynamoDbInbox("inbox-table") + .UseDynamoDbOutboxArchive())); +``` + +### Google Cloud Platform (GCP) with Firestore + +```csharp +services.AddFluentBrighter(brighter => brighter + .UsingGcp(gcp => gcp + .SetProjectId("my-gcp-project") + .UsePubSubPublication(pb => pb + .AddPublication(p => p + .SetTopicAttributes(t => t.SetName("user-registered-topic")) + .SetSource("https://example.com/users")) + .AddPublication(p => p + .SetTopicAttributes(t => t.SetName("user-deleted-topic")) + .SetSource("https://example.com/users"))) + .UsePubSubSubscription(sb => sb + .AddSubscription(s => s + .SetSubscriptionName("user-registration-handler") + .SetTopicAttributes(t => t.SetName("user-registered-topic")) + .SetNoOfPerformers(5)) + .AddSubscription(s => s + .SetSubscriptionName("user-deletion-handler") + .SetTopicAttributes(t => t.SetName("user-deleted-topic")) + .SetNoOfPerformers(3))) + .UseFirestoreOutbox("outbox") + .UseFirestoreInbox("inbox") + .UseFirestoreOutboxArchive(cfg => cfg + .SetMinimumAge(TimeSpan.FromDays(7)) + .SetBatchSize(100)))); +``` + +### Apache Kafka + +```csharp +services.AddFluentBrighter(brighter => brighter + .UsingKafka(kafka => kafka + .SetBootstrapServers("localhost:9092") + .UsePublications(pb => pb + .AddPublication(p => p + .SetTopic("payments-processed") + .SetMessageIdHeaderKey("message-id"))) + .UseSubscriptions(sb => sb + .AddSubscription(s => s + .SetSubscriptionName("payment-notification-service") + .SetTopic("payments-processed") + .SetGroupId("payment-consumers") + .SetNoOfPerformers(10))))); +``` + +### PostgreSQL with Outbox Pattern + +```csharp +services.AddFluentBrighter(brighter => brighter + .UsingRabbitMq(rabbitmq => /* ... RabbitMQ config ... */) + .Producers(producer => producer + .UsePostgresOutbox(cfg => cfg + .SetConnectionString("Host=localhost;Database=brighter;") + .SetTableName("outbox"))) + .Subscriptions(sub => sub + .UsePostgresInbox(cfg => cfg + .SetConnectionString("Host=localhost;Database=brighter;") + .SetTableName("inbox"))) + .UseOutboxSweeper(cfg => cfg + .SetTimerInterval(TimeSpan.FromSeconds(30)) + .SetBatchSize(100))); +``` + +### SQL Server with Distributed Locking + +```csharp +services.AddFluentBrighter(brighter => brighter + .UsingRabbitMq(rabbitmq => /* ... RabbitMQ config ... */) + .Producers(producer => producer + .UseSqlServerOutbox(cfg => cfg + .SetConnectionString("Server=localhost;Database=Brighter;") + .SetTableName("Outbox")) + .UseSqlServerDistributedLock(cfg => cfg + .SetConnectionString("Server=localhost;Database=Brighter;") + .SetTableName("DistributedLock")))); +``` + +## 🎯 Key Concepts + +### Outbox Pattern +Ensures reliable message publishing by storing messages in a database transaction with your business data, then publishing them asynchronously. + +### Inbox Pattern +Prevents duplicate message processing by tracking received messages in a database, ensuring idempotency. + +### Distributed Locking +Coordinates message processing across multiple service instances to prevent concurrent execution. + +### Outbox Sweeper +Background service that publishes messages from the outbox and archives old messages. + +### Large Message Storage +Stores large message payloads (luggage) in cloud storage (S3, GCS) and passes references through the message broker. + +## 🔧 Configuration Options + +### Message Pump Types +- **Reactor** - Synchronous, single-threaded message processing +- **Proactor** - Asynchronous, concurrent message processing + +### Channel Creation +- **Create** - Automatically create topics/queues if missing +- **Assume** - Assume topics/queues already exist +- **Validate** - Validate existence and throw if missing + +### Subscription Options +- Number of performers (concurrent handlers) +- Timeout settings +- Retry policies +- Dead letter queues +- Message ordering +- Acknowledgment modes + +## 📖 Documentation + +For detailed documentation on Paramore.Brighter, visit: +- [Paramore.Brighter Documentation](https://brightercommand.github.io/Brighter/) +- [Paramore.Brighter GitHub](https://github.com/BrighterCommand/Brighter) + +## 🤝 Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. + +## 📄 License + +This project is licensed under the GPL 3 License - see the [LICENSE](LICENSE) file for details. + +## 🙏 Acknowledgments + +Built on top of the excellent [Paramore.Brighter](https://github.com/BrighterCommand/Brighter) framework by Ian Cooper and contributors. + diff --git a/samples/GcpSample/Commands/FarewellEvent.cs b/samples/GcpSample/Commands/FarewellEvent.cs new file mode 100644 index 0000000..0fd443d --- /dev/null +++ b/samples/GcpSample/Commands/FarewellEvent.cs @@ -0,0 +1,12 @@ +using Paramore.Brighter; + +namespace GcpSample.Commands; + +public class FarewellEvent(string farewell) : Event(Uuid.New()) +{ + public FarewellEvent() : this(string.Empty) + { + } + + public string Farewell { get; set; } = farewell; +} \ No newline at end of file diff --git a/samples/GcpSample/Commands/GreetingEvent.cs b/samples/GcpSample/Commands/GreetingEvent.cs new file mode 100644 index 0000000..b0750cc --- /dev/null +++ b/samples/GcpSample/Commands/GreetingEvent.cs @@ -0,0 +1,10 @@ +using Paramore.Brighter; + +namespace GcpSample.Commands; + +public class GreetingEvent(string greeting) : Event(Uuid.New()) +{ + public GreetingEvent() : this(string.Empty) { } + + public string Greeting { get; set; } = greeting; +} \ No newline at end of file diff --git a/samples/GcpSample/GcpSample.csproj b/samples/GcpSample/GcpSample.csproj new file mode 100644 index 0000000..d957171 --- /dev/null +++ b/samples/GcpSample/GcpSample.csproj @@ -0,0 +1,19 @@ + + + + Exe + net8.0 + enable + enable + false + + + + + + + + + + + diff --git a/samples/GcpSample/Handlers/FarewellEventHandler.cs b/samples/GcpSample/Handlers/FarewellEventHandler.cs new file mode 100644 index 0000000..a1cea11 --- /dev/null +++ b/samples/GcpSample/Handlers/FarewellEventHandler.cs @@ -0,0 +1,14 @@ +using GcpSample.Commands; + +using Paramore.Brighter; + +namespace GcpSample.Handlers; + +public class FarewellEventHandler : RequestHandlerAsync +{ + public override Task HandleAsync(FarewellEvent command, CancellationToken cancellationToken = new CancellationToken()) + { + Console.WriteLine($"---------Bye bye {command.Farewell}---------"); + return base.HandleAsync(command, cancellationToken); + } +} \ No newline at end of file diff --git a/samples/GcpSample/Handlers/GreetingEventHandler.cs b/samples/GcpSample/Handlers/GreetingEventHandler.cs new file mode 100644 index 0000000..eb567ae --- /dev/null +++ b/samples/GcpSample/Handlers/GreetingEventHandler.cs @@ -0,0 +1,14 @@ +using GcpSample.Commands; + +using Paramore.Brighter; + +namespace GcpSample.Handlers; + +public class GreetingEventHandler : RequestHandlerAsync +{ + public override Task HandleAsync(GreetingEvent command, CancellationToken cancellationToken = new CancellationToken()) + { + Console.WriteLine($"========Hello {command.Greeting}========"); + return base.HandleAsync(command, cancellationToken); + } +} \ No newline at end of file diff --git a/samples/GcpSample/Program.cs b/samples/GcpSample/Program.cs new file mode 100644 index 0000000..9d49682 --- /dev/null +++ b/samples/GcpSample/Program.cs @@ -0,0 +1,89 @@ +using Fluent.Brighter; + +using GcpSample.Commands; + +using Google.Apis.Auth.OAuth2; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +using Paramore.Brighter; +using Paramore.Brighter.ServiceActivator.Extensions.Hosting; + +using Serilog; + +Log.Logger = new LoggerConfiguration() + .MinimumLevel.Information() + .Enrich.FromLogContext() + .WriteTo.Console() + .CreateLogger(); + +var host = new HostBuilder() + .UseSerilog() + .ConfigureServices((_, services) => + { + services + .AddHostedService() + .AddFluentBrighter(brighter => brighter + .UseOutboxSweeper() + .UsingGcp(gcp => gcp + .SetConnection(c => c + .SetCredential(GoogleCredential.GetApplicationDefault()) + .SetProjectId(Environment.GetEnvironmentVariable("GOOGLE_CLOUD_PROJECT")!)) + .UsePubSubPublication(pb => pb + .AddPublication(p => p + .SetTopic("greeting-event-topic") + .SetSource("https://example.com/greeting")) + .AddPublication(p => p + .SetTopic("farewell-event-topic") + .SetSource("https://example.com/farewell"))) + .UsePubSubSubscription(sb => sb + .AddSubscription(s => s + .SetSubscriptionName("paramore.example.greeting") + .SetSubscription("greeting-event-queue") + .SetTopic("greeting-event-topic") + .SetNoOfPerformers(1)) + .AddSubscription(s => s + .SetSubscriptionName("paramore.example.farewell") + .SetSubscription("farewell-event-queue") + .SetTopic("farewell-event-topic") + .SetNoOfPerformers(1))) + .SetFirestoreConfiguration("brighter-firestore-database") + .UseFirestoreOutbox("outbox") + .UseFirestoreInbox("inbox") + // .UseFirestoreDistributedLock("locking") + .UseFirestoreOutboxArchive() + )); + }) + .Build(); + + +await host.StartAsync(); + +using var cts = new CancellationTokenSource(); + +Console.CancelKeyPress += (_, _) => cts.Cancel(); + +while (!cts.IsCancellationRequested) +{ + await Task.Delay(TimeSpan.FromSeconds(1)); + Console.Write("Say your name (or q to quit): "); + var name = Console.ReadLine(); + + if (string.IsNullOrEmpty(name)) + { + continue; + } + + if (name == "q") + { + break; + } + + var process = host.Services.GetRequiredService(); + await process.DepositPostAsync(new GreetingEvent(name)); + await process.DepositPostAsync(new FarewellEvent(name)); +} + + +await host.StopAsync(); diff --git a/samples/MongoDbSample/Program.cs b/samples/MongoDbSample/Program.cs index 1cb4920..88e3541 100644 --- a/samples/MongoDbSample/Program.cs +++ b/samples/MongoDbSample/Program.cs @@ -48,9 +48,9 @@ .SetConnection(c => c .SetConnectionString("mongodb://root:example@localhost:27017") .SetDatabaseName("brighter")) - .UseInbox() - .UseOutbox() - .UseDistributedLock() + .UseInbox("inbox") + .UseOutbox("outbox") + .UseDistributedLock("locking") .UseLuggageStore("bucket")) ); }) diff --git a/src/Fluent.Brighter.AWS.V4/Extensions/ProducerExtensions.cs b/src/Fluent.Brighter.AWS.V4/Extensions/ProducerExtensions.cs index 5dbc28c..c7bd9f2 100644 --- a/src/Fluent.Brighter.AWS.V4/Extensions/ProducerExtensions.cs +++ b/src/Fluent.Brighter.AWS.V4/Extensions/ProducerExtensions.cs @@ -58,7 +58,7 @@ public static ProducerBuilder UseDynamoDbOutbox(this ProducerBuilder builder, AWSMessagingGatewayConnection connection) { return builder - .UseDynamoDbOutbox(x => x.SetConnection(connection)); + .UseDynamoDbOutbox(x => x.SetConnection(connection)); } /// diff --git a/src/Fluent.Brighter.GoogleCloud/DeadLetterPolicyBuilder.cs b/src/Fluent.Brighter.GoogleCloud/DeadLetterPolicyBuilder.cs new file mode 100644 index 0000000..537c35d --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/DeadLetterPolicyBuilder.cs @@ -0,0 +1,123 @@ +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Google Cloud Pub/Sub Dead Letter Policy (DLQ). +/// Provides methods to set the dead letter topic, subscription, publisher/subscriber members, +/// acknowledgment deadline, and maximum delivery attempts. +/// +public sealed class DeadLetterPolicyBuilder +{ + private RoutingKey? _topicName; + + /// + /// Sets the routing key for the Dead Letter Topic (DLT) where messages that exceed the max delivery attempts are sent. + /// This topic must exist on Google Cloud Pub/Sub. + /// + /// The routing key for the dead letter topic + /// The builder instance for method chaining + public DeadLetterPolicyBuilder SetTopicName(RoutingKey topicName) + { + _topicName = topicName; + return this; + } + + private ChannelName? _subscription; + + /// + /// Sets the channel name of the main subscription that this dead letter policy applies to. + /// + /// The subscription channel name + /// The builder instance for method chaining + public DeadLetterPolicyBuilder SetSubscription(ChannelName subscription) + { + _subscription = subscription; + return this; + } + + private string? _publisherMember; + + /// + /// Sets the member field in the main message that identifies the publisher. + /// This is often used for logging or tracing purposes. + /// + /// The publisher member field name + /// The builder instance for method chaining + public DeadLetterPolicyBuilder SetPublisherMember(string? publisherMember) + { + _publisherMember = publisherMember; + return this; + } + + private string? _subscriberMember; + + /// + /// Sets the member field in the main message that identifies the subscriber. + /// This is often used for logging or tracing purposes. + /// + /// The subscriber member field name + /// The builder instance for method chaining + public DeadLetterPolicyBuilder SetSubscriberMember(string? subscriberMember) + { + _subscriberMember = subscriberMember; + return this; + } + + private int _ackDeadlineSeconds = 60; + + /// + /// Sets the number of seconds the subscriber has to acknowledge a message before Pub/Sub redelivers it. + /// This value is applied to the dead-letter subscription (the subscription on the DLT) if one is created by Brighter. + /// The default is 60 seconds, which is the Pub/Sub default. + /// + /// The acknowledgment deadline in seconds + /// The builder instance for method chaining + public DeadLetterPolicyBuilder SetAckDeadlineSeconds(int ackDeadlineSeconds) + { + _ackDeadlineSeconds = ackDeadlineSeconds; + return this; + } + + private int _maxDeliveryAttempts = 10; + + /// + /// Sets the maximum number of times Pub/Sub attempts to deliver a message before + /// sending it to the Dead Letter Topic. + /// The value must be between 5 and 100. The default is 10. + /// + /// The maximum number of delivery attempts + /// The builder instance for method chaining + public DeadLetterPolicyBuilder SetMaxDeliveryAttempts(int maxDeliveryAttempts) + { + _maxDeliveryAttempts = maxDeliveryAttempts; + return this; + } + + /// + /// Builds a DeadLetterPolicy instance with the configured values. + /// + /// A configured DeadLetterPolicy instance + /// Thrown when required configuration is missing + internal DeadLetterPolicy Build() + { + if (_topicName == null) + { + throw new ConfigurationException("Dead letter topic name not set"); + } + + if (_subscription == null) + { + throw new ConfigurationException("Subscription name not set"); + } + + return new DeadLetterPolicy(_topicName, _subscription) + { + PublisherMember = _publisherMember, + SubscriberMember = _subscriberMember, + AckDeadlineSeconds = _ackDeadlineSeconds, + MaxDeliveryAttempts = _maxDeliveryAttempts + }; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/ConsumerBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/ConsumerBuilderExtensions.cs new file mode 100644 index 0000000..470b2b3 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/ConsumerBuilderExtensions.cs @@ -0,0 +1,152 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter; +using Paramore.Brighter.Firestore; +using Paramore.Brighter.Inbox.Firestore; +using Paramore.Brighter.Inbox.Spanner; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +using SpannerInboxBuilder = Fluent.Brighter.GoogleCloud.SpannerInboxBuilder; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for configuring Google Cloud Platform services with the . +/// +public static class ConsumerBuilderExtensions +{ + /// + /// Adds a Google Cloud Pub/Sub subscription to the consumer builder. + /// + /// The consumer builder instance. + /// The configured to add. + /// The consumer builder for method chaining. + public static ConsumerBuilder AddPubSubSubscription(this ConsumerBuilder builder, GcpPubSubSubscription subscription) + { + return builder.AddSubscription(subscription); + } + + /// + /// Adds a Google Cloud Pub/Sub subscription to the consumer builder using a configuration action. + /// + /// The consumer builder instance. + /// An action to configure the . + /// The consumer builder for method chaining. + public static ConsumerBuilder AddPubSubSubscription(this ConsumerBuilder builder, + Action configure) + { + var subscription = new GcpPubSubSubscriptionBuilder(); + configure(subscription); + return builder.AddSubscription(subscription.Build()); + } + + /// + /// Adds a Google Cloud Pub/Sub subscription to the consumer builder with a strongly-typed request using a configuration action. + /// + /// The type of request that this subscription handles. + /// The consumer builder instance. + /// An action to configure the . + /// The consumer builder for method chaining. + public static ConsumerBuilder AddPubSubSubscription(this ConsumerBuilder builder, Action configure) + where TRequest : class, IRequest + { + var subscription = new GcpPubSubSubscriptionBuilder(); + subscription.SetDataType(typeof(TRequest)); + configure(subscription); + return builder.AddSubscription(subscription.Build()); + } + + /// + /// Adds a Google Cloud Pub/Sub channel factory to the consumer builder. + /// + /// The consumer builder instance. + /// An action to configure the . + /// The consumer builder for method chaining. + public static ConsumerBuilder AddPubSubChannelFactory(this ConsumerBuilder builder, Action configure) + { + var subscription = new GcpMessagingGatewayConnectionBuilder(); + configure(subscription); + return builder.AddChannelFactory(new GcpPubSubChannelFactory(subscription.Build())); + } + + /// + /// Configures the consumer to use Google Cloud Firestore as the inbox storage. + /// + /// The consumer builder instance. + /// An action to configure the . + /// The consumer builder for method chaining. + public static ConsumerBuilder UseFirestoreInbox(this ConsumerBuilder builder, + Action configure) + { + var configuration = new FirestoreInboxBuilder(); + configure(configuration); + return builder.UseFirestoreInbox(configuration.Build()); + } + + /// + /// Configures the consumer to use Google Cloud Firestore as the inbox storage using an existing gateway connection. + /// + /// The consumer builder instance. + /// The containing credentials and project information. + /// The consumer builder for method chaining. + public static ConsumerBuilder UseFirestoreInbox(this ConsumerBuilder builder, GcpMessagingGatewayConnection connection) + { + return builder.UseFirestoreInbox(cfg => cfg + .SetConfiguration(c => c + .SetCredential(connection.Credential) + .SetProjectId(connection.ProjectId) + .SetInbox(new FirestoreCollection{ Name = "inbox"}))); + } + + /// + /// Configures the consumer to use Google Cloud Firestore as the inbox storage with a pre-configured inbox instance. + /// + /// The consumer builder instance. + /// The configured instance. + /// The consumer builder for method chaining. + public static ConsumerBuilder UseFirestoreInbox(this ConsumerBuilder builder, FirestoreInbox inbox) + { + return builder.SetInbox(cfg => cfg.SetInbox(inbox)); + } + + /// + /// Configures the consumer to use Google Cloud Spanner as the inbox storage. + /// + /// The consumer builder instance. + /// An action to configure the . + /// The consumer builder for method chaining. + public static ConsumerBuilder UseSpannerInbox(this ConsumerBuilder builder, + Action configure) + { + var configuration = new SpannerInboxBuilder(); + configure(configuration); + return builder.UseSpannerInbox(configuration.Build()); + } + + /// + /// Configures the consumer to use Google Cloud Spanner as the inbox storage with a connection string. + /// + /// The consumer builder instance. + /// The Spanner connection string. + /// The consumer builder for method chaining. + public static ConsumerBuilder UseSpannerInbox(this ConsumerBuilder builder, string connectionString) + { + return builder.UseSpannerInbox(cfg => cfg + .SetConfiguration(c => c + .SetConnectionString(connectionString) + .SetInboxTableName("inbox"))); + } + + /// + /// Configures the consumer to use Google Cloud Spanner as the inbox storage with a pre-configured inbox instance. + /// + /// The consumer builder instance. + /// The configured instance. + /// The consumer builder for method chaining. + public static ConsumerBuilder UseSpannerInbox(this ConsumerBuilder builder, SpannerInboxAsync inbox) + { + return builder.SetInbox(cfg => cfg.SetInbox(inbox)); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreConfigurationBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreConfigurationBuilderExtensions.cs new file mode 100644 index 0000000..0b3d1d2 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreConfigurationBuilderExtensions.cs @@ -0,0 +1,96 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +namespace Fluent.Brighter; + +/// +/// Extension methods for to provide convenient configuration +/// of Firestore collections using builder pattern callbacks. +/// +public static class FirestoreConfigurationBuilderExtensions +{ + /// + /// Sets the inbox Firestore collection name. + /// + /// The Firestore configuration builder instance + /// The name of the inbox collection + /// The Firestore configuration builder instance for method chaining + public static FirestoreConfigurationBuilder SetInbox( + this FirestoreConfigurationBuilder builder, + string tableName) + { + return builder.SetInbox(c => c.SetName(tableName)); + } + + /// + /// Sets the inbox Firestore collection using a fluent configuration callback. + /// + /// The Firestore configuration builder instance + /// An action to configure the inbox collection builder + /// The Firestore configuration builder instance for method chaining + public static FirestoreConfigurationBuilder SetInbox( + this FirestoreConfigurationBuilder builder, + Action configure) + { + var collection = new FirestoreCollectionBuilder(); + configure(collection); + return builder.SetInbox(collection.Build()); + } + + /// + /// Sets the outbox Firestore collection name. + /// + /// The Firestore configuration builder instance + /// The name of the outbox collection + /// The Firestore configuration builder instance for method chaining + public static FirestoreConfigurationBuilder SetOutbox( + this FirestoreConfigurationBuilder builder, + string tableName) + { + return builder.SetOutbox(c => c.SetName(tableName)); + } + + /// + /// Sets the outbox Firestore collection using a fluent configuration callback. + /// + /// The Firestore configuration builder instance + /// An action to configure the outbox collection builder + /// The Firestore configuration builder instance for method chaining + public static FirestoreConfigurationBuilder SetOutbox( + this FirestoreConfigurationBuilder builder, + Action configure) + { + var collection = new FirestoreCollectionBuilder(); + configure(collection); + return builder.SetOutbox(collection.Build()); + } + + /// + /// Sets the locking Firestore collection name. + /// + /// The Firestore configuration builder instance + /// The name of the locking collection + /// The Firestore configuration builder instance for method chaining + public static FirestoreConfigurationBuilder SetLocking( + this FirestoreConfigurationBuilder builder, + string tableName) + { + return builder.SetLocking(c => c.SetName(tableName)); + } + + /// + /// Sets the locking Firestore collection using a fluent configuration callback. + /// + /// The Firestore configuration builder instance + /// An action to configure the locking collection builder + /// The Firestore configuration builder instance for method chaining + public static FirestoreConfigurationBuilder SetLocking( + this FirestoreConfigurationBuilder builder, + Action configure) + { + var collection = new FirestoreCollectionBuilder(); + configure(collection); + return builder.SetLocking(collection.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreInboxBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreInboxBuilderExtensions.cs new file mode 100644 index 0000000..7852325 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreInboxBuilderExtensions.cs @@ -0,0 +1,26 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +namespace Fluent.Brighter; + +/// +/// Extension methods for to provide convenient configuration +/// of Firestore settings using builder pattern callbacks. +/// +public static class FirestoreInboxBuilderExtensions +{ + /// + /// Sets the Firestore configuration using a fluent configuration callback. + /// + /// The Firestore inbox builder instance + /// An action to configure the Firestore configuration builder + /// The Firestore inbox builder instance for method chaining + public static FirestoreInboxBuilder SetConfiguration(this FirestoreInboxBuilder builder, + Action configure) + { + var configuration = new FirestoreConfigurationBuilder(); + configure(configuration); + return builder.SetConfiguration(configuration.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreLockingBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreLockingBuilderExtensions.cs new file mode 100644 index 0000000..74936f5 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreLockingBuilderExtensions.cs @@ -0,0 +1,36 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +namespace Fluent.Brighter; + +/// +/// Extension methods for to provide fluent configuration options. +/// +public static class FirestoreLockingBuilderExtensions +{ + /// + /// Sets the Firestore configuration using a fluent configuration builder. + /// + /// The instance. + /// An action to configure the . + /// The builder instance for method chaining. + /// Thrown when or is null. + /// + /// + /// builder.SetConfiguration(config => config + /// .SetProjectId("my-project") + /// .SetDatabase("(default)") + /// .SetLocking(locking => locking + /// .SetName("locks") + /// .SetTtl(TimeSpan.FromMinutes(5)))); + /// + /// + public static FirestoreLockingBuilder SetConfiguration(this FirestoreLockingBuilder builder, + Action configure) + { + var configuration = new FirestoreConfigurationBuilder(); + configure(configuration); + return builder.SetConfiguration(configuration.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreOutboxBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreOutboxBuilderExtensions.cs new file mode 100644 index 0000000..ba31dc4 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/FirestoreOutboxBuilderExtensions.cs @@ -0,0 +1,28 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter.Firestore; + +namespace Fluent.Brighter; + +/// +/// Extension methods for to provide convenient configuration +/// of Firestore settings using builder pattern callbacks. +/// +public static class FirestoreOutboxBuilderExtensions +{ + /// + /// Sets the Firestore configuration using a fluent configuration callback. + /// + /// The Firestore outbox builder instance + /// An action to configure the Firestore configuration builder + /// The Firestore outbox builder instance for method chaining + public static FirestoreOutboxBuilder SetConfiguration(this FirestoreOutboxBuilder builder, + Action configure) + { + var configuration = new FirestoreConfigurationBuilder(); + configure(configuration); + return builder.SetConfiguration(configuration.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/FluentBrighterExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/FluentBrighterExtensions.cs new file mode 100644 index 0000000..921f196 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/FluentBrighterExtensions.cs @@ -0,0 +1,87 @@ +using System; +using System.Data.Common; + +using Fluent.Brighter.GoogleCloud; + +using Google.Cloud.Firestore.V1; + +using Paramore.Brighter; + +namespace Fluent.Brighter; + +/// +/// Extension methods for FluentBrighterBuilder to provide GCP-specific configurations +/// and Firestore/Spanner outbox archiving functionality. +/// +public static class FluentBrighterExtensions +{ + /// + /// Configures Google Cloud Platform services (Pub/Sub, Firestore, Spanner, GCS) for use with Paramore.Brighter + /// using a fluent configuration pattern. + /// + /// The FluentBrighterBuilder instance + /// Action to configure GCP services + /// The FluentBrighterBuilder instance for method chaining + public static FluentBrighterBuilder UsingGcp(this FluentBrighterBuilder builder, + Action configure) + { + var configurator = new GcpConfigurator(); + configure(configurator); + configurator.SetFluentBrighter(builder); + return builder; + } + + #region Outbox + + /// + /// Configures Firestore as the transaction outbox archive store using default settings. + /// Note: Currently uses a null archive provider implementation. + /// + /// The FluentBrighterBuilder instance + /// The FluentBrighterBuilder instance for method chaining + public static FluentBrighterBuilder UseFirestoreTransactionOutboxArchive(this FluentBrighterBuilder builder) + { + return builder.UseOutboxArchiver(new NullOutboxArchiveProvider()); + } + + /// + /// Configures Firestore as the transaction outbox archive store with custom settings. + /// Note: Currently uses a null archive provider implementation. + /// + /// The FluentBrighterBuilder instance + /// Action to configure outbox archiver options + /// The FluentBrighterBuilder instance for method chaining + public static FluentBrighterBuilder UseFirestoreTransactionOutboxArchive(this FluentBrighterBuilder builder, Action configure) + { + var options = new TimedOutboxArchiverOptionsBuilder(); + configure(options); + return builder.UseOutboxArchiver(new NullOutboxArchiveProvider(), options.Build()); + } + + /// + /// Configures Cloud Spanner as the transaction outbox archive store using default settings. + /// Note: Currently uses a null archive provider implementation. + /// + /// The FluentBrighterBuilder instance + /// The FluentBrighterBuilder instance for method chaining + public static FluentBrighterBuilder UseSpannerTransactionOutboxArchive(this FluentBrighterBuilder builder) + { + return builder.UseOutboxArchiver(new NullOutboxArchiveProvider()); + } + + /// + /// Configures Cloud Spanner as the transaction outbox archive store with custom settings. + /// Note: Currently uses a null archive provider implementation. + /// + /// The FluentBrighterBuilder instance + /// Action to configure outbox archiver options + /// The FluentBrighterBuilder instance for method chaining + public static FluentBrighterBuilder UseSpannerTransactionOutboxArchive(this FluentBrighterBuilder builder, Action configure) + { + var options = new TimedOutboxArchiverOptionsBuilder(); + configure(options); + return builder.UseOutboxArchiver(new NullOutboxArchiveProvider(), options.Build()); + } + + #endregion +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPubSubSubscriptionBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPubSubSubscriptionBuilderExtensions.cs new file mode 100644 index 0000000..b835a84 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPubSubSubscriptionBuilderExtensions.cs @@ -0,0 +1,161 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for configuring Google Cloud Pub/Sub subscriptions using a fluent API. +/// +public static class GcpPubSubSubscriptionBuilderExtensions +{ + /// + /// Configures the Google Cloud Pub/Sub topic attributes using a fluent builder. + /// This allows setting properties like project ID, topic name, labels, retention duration, + /// schema settings, and encryption configuration for the topic associated with this subscription. + /// + /// The GCP Pub/Sub subscription builder instance + /// An action to configure the topic attributes using the builder + /// The configured instance for method chaining + /// + /// + /// builder.SetTopicAttributes(attrs => + /// { + /// attrs.SetName("my-topic") + /// .SetProjectId("my-project") + /// .AddLabel("environment", "production"); + /// }); + /// + /// + public static GcpPubSubSubscriptionBuilder SetTopicAttributes(this GcpPubSubSubscriptionBuilder builder, + Action configure) + { + var attrs = new TopicAttributeBuilder(); + configure(attrs); + return builder.SetTopicAttributes(attrs.Build()); + } + + /// + /// Enables message ordering for the subscription. + /// When enabled, messages published to the topic are delivered in the order they were published, + /// provided they were published with an ordering key. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder EnableMessageOrdering(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetEnableMessageOrdering(true); + } + + /// + /// Disables message ordering for the subscription. + /// Messages will be delivered in the order received by Pub/Sub, which may not match publishing order. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder DisableMessageOrdering(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetEnableMessageOrdering(false); + } + + /// + /// Enables exactly-once delivery for the subscription. + /// When enabled, Pub/Sub guarantees that each message is delivered and acknowledged exactly once, + /// preventing duplicate processing. This feature may increase latency and cost. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder EnableExactlyOnceDelivery(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetEnableExactlyOnceDelivery(true); + } + + /// + /// Disables exactly-once delivery for the subscription. + /// Messages may be delivered more than once (at-least-once delivery semantics). + /// This is the default behavior and offers better performance and lower cost. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder DisableExactlyOnceDelivery(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetEnableExactlyOnceDelivery(false); + } + + /// + /// Configures the subscription to use streaming mode for message consumption. + /// In streaming mode, Pub/Sub pushes messages to the subscriber as they become available, + /// providing lower latency. This is the default and recommended mode for most use cases. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder UseStreamMode(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetSubscriptionMode(SubscriptionMode.Stream); + } + + /// + /// Configures the subscription to use pull mode for message consumption. + /// In pull mode, the subscriber explicitly requests messages from Pub/Sub, + /// providing more control over message flow and processing rate. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder UsePullMode(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetSubscriptionMode(SubscriptionMode.Pull); + } + + /// + /// Configures the Dead Letter Policy (DLQ) for the subscription using a fluent builder. + /// Messages that fail processing after a specified number of delivery attempts will be + /// forwarded to the configured dead letter topic for manual inspection or reprocessing. + /// + /// The GCP Pub/Sub subscription builder instance + /// An action to configure the dead letter policy using the builder + /// The configured instance for method chaining + /// + /// + /// builder.SetDeadLetter(dlq => + /// { + /// dlq.SetTopicName(new RoutingKey("my-dlq-topic")) + /// .SetSubscription(new ChannelName("my-subscription")) + /// .SetMaxDeliveryAttempts(5); + /// }); + /// + /// + public static GcpPubSubSubscriptionBuilder SetDeadLetter(this GcpPubSubSubscriptionBuilder builder, + Action configure) + { + var policy = new DeadLetterPolicyBuilder(); + configure(policy); + return builder.SetDeadLetter(policy.Build()); + } + + /// + /// Configures the subscription to use the Proactor message pump type for asynchronous message processing. + /// The Proactor pattern processes messages using async/await, allowing for concurrent message handling + /// and better resource utilization. This is the recommended mode for async pipelines. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder UseProactor(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetMessagePumpType(MessagePumpType.Proactor); + } + + /// + /// Configures the subscription to use the Reactor message pump type for synchronous message processing. + /// The Reactor pattern processes messages synchronously in a single-threaded manner. + /// Use this mode for synchronous pipelines or when you need predictable, sequential processing. + /// + /// The GCP Pub/Sub subscription builder instance + /// The configured instance for method chaining + public static GcpPubSubSubscriptionBuilder UseReactor(this GcpPubSubSubscriptionBuilder builder) + { + return builder.SetMessagePumpType(MessagePumpType.Reactor); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPublicationBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPublicationBuilderExtensions.cs new file mode 100644 index 0000000..441cbff --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPublicationBuilderExtensions.cs @@ -0,0 +1,103 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for configuring Google Cloud Pub/Sub publication settings using a fluent API. +/// +public static class GcpPublicationBuilderExtensions +{ + /// + /// Configures the Google Cloud Pub/Sub topic attributes using a fluent builder. + /// This allows setting properties like project ID, topic name, labels, retention duration, + /// schema settings, and encryption configuration. + /// + /// The GCP publication builder instance + /// An action to configure the topic attributes using the builder + /// The configured instance for method chaining + /// + /// + /// builder.SetTopicAttributes(attrs => + /// { + /// attrs.SetName("my-topic") + /// .SetProjectId("my-project") + /// .AddLabel("environment", "production"); + /// }); + /// + /// + public static GcpPublicationBuilder SetTopicAttributes(this GcpPublicationBuilder builder, + Action configure) + { + var attrs = new TopicAttributeBuilder(); + configure(attrs); + return builder.SetTopicAttributes(attrs.Build()); + } + + /// + /// Sets the data schema URI from a string value for CloudEvents metadata. + /// Identifies the schema that data adheres to. If the string is null or empty, sets the data schema to null. + /// + /// The GCP publication builder instance + /// The data schema URI as a string, or null + /// The configured instance for method chaining + public static GcpPublicationBuilder SetDataSchema(this GcpPublicationBuilder builder, string? dataSchema) + { + if (string.IsNullOrEmpty(dataSchema)) + { + return builder.SetDataSchema(null); + } + + return builder.SetDataSchema(new Uri(dataSchema, UriKind.RelativeOrAbsolute)); + } + + /// + /// Sets the source URI from a string value for CloudEvents metadata. + /// Identifies the context in which an event happened, such as the event source organization or process. + /// + /// The GCP publication builder instance + /// The source URI as a string + /// The configured instance for method chaining + public static GcpPublicationBuilder SetSource(this GcpPublicationBuilder builder, string dataSchema) + { + return builder.SetSource(new Uri(dataSchema, UriKind.RelativeOrAbsolute)); + } + + /// + /// Configures the publication to create the Pub/Sub topic if it doesn't exist. + /// This sets the make channels policy to . + /// + /// The GCP publication builder instance + /// The configured instance for method chaining + public static GcpPublicationBuilder CreateQueueIfMissing(this GcpPublicationBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Create); + } + + /// + /// Configures the publication to validate that the Pub/Sub topic exists without creating it. + /// This sets the make channels policy to . + /// If the topic doesn't exist, an error will be raised. + /// + /// The GCP publication builder instance + /// The configured instance for method chaining + public static GcpPublicationBuilder ValidIfQueueExists(this GcpPublicationBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Validate); + } + + /// + /// Configures the publication to assume the Pub/Sub topic exists without validation or creation. + /// This sets the make channels policy to . + /// Use this when the topic is managed externally or already known to exist. + /// + /// The GCP publication builder instance + /// The configured instance for method chaining + public static GcpPublicationBuilder AssumeQueueExists(this GcpPublicationBuilder builder) + { + return builder.SetMakeChannels(OnMissingChannel.Assume); + } +} diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPublicationFactoryBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPublicationFactoryBuilderExtensions.cs new file mode 100644 index 0000000..af2b157 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/GcpPublicationFactoryBuilderExtensions.cs @@ -0,0 +1,85 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter; + +namespace Fluent.Brighter; + +/// +/// Extension methods for to provide a fluent configuration API. +/// +public static class GcpPublicationFactoryBuilderExtensions +{ + /// + /// Sets the Google Cloud Pub/Sub messaging gateway connection using a builder action. + /// This overload allows configuring the connection inline without creating a separate connection object. + /// + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// + /// builder.SetConnection(conn => conn + /// .SetProjectId("my-gcp-project") + /// .SetCredential(GoogleCredential.GetApplicationDefault())); + /// + /// + public static GcpPublicationFactoryBuilder SetConnection(this GcpPublicationFactoryBuilder builder, + Action configure) + { + var connection = new GcpMessagingGatewayConnectionBuilder(); + configure(connection); + return builder.SetConnection(connection.Build()); + } + + /// + /// Adds a message publication configuration to the Google Cloud Pub/Sub message producer factory. + /// This overload allows configuring publication settings using a builder action without specifying a request type. + /// + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// + /// builder.AddPublication(pub => pub + /// .SetTopicAttributes(attrs => attrs.SetName("my-topic")) + /// .SetSource("https://example.com/events")); + /// + /// + public static GcpPublicationFactoryBuilder AddPublication(this GcpPublicationFactoryBuilder builder, + Action configure) + { + var connection = new GcpPublicationBuilder(); + configure(connection); + return builder.AddPublication(connection.Build()); + } + + /// + /// Adds a message publication configuration to the Google Cloud Pub/Sub message producer factory for a specific request type. + /// This overload automatically associates the publication with the specified request type using generics. + /// + /// The type of request (command or event) this publication handles. Must implement . + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// + /// builder.AddPublication<MyCommand>(pub => pub + /// .SetTopicAttributes(attrs => attrs + /// .SetName("my-command-topic") + /// .SetLabels(new Dictionary<string, string> { ["env"] = "production" })) + /// .SetSource("https://example.com/commands") + /// .SetSubject("MyCommand")); + /// + /// + public static GcpPublicationFactoryBuilder AddPublication(this GcpPublicationFactoryBuilder builder, + Action configure) + where T : class, IRequest + { + var connection = new GcpPublicationBuilder(); + connection.SetRequestType(typeof(T)); + configure(connection); + return builder.AddPublication(connection.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/GcsLuggageStoreBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/GcsLuggageStoreBuilderExtensions.cs new file mode 100644 index 0000000..ec35ab9 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/GcsLuggageStoreBuilderExtensions.cs @@ -0,0 +1,42 @@ +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter.Transforms.Storage; + +namespace Fluent.Brighter; + +/// +/// Extension methods for to provide convenient configuration +/// of storage strategies for Google Cloud Storage bucket handling. +/// +public static class GcsLuggageStoreBuilderExtensions +{ + /// + /// Sets the storage strategy to create the GCS bucket if it doesn't exist. + /// + /// The GCS luggage store builder instance + /// The GCS luggage store builder instance for method chaining + public static GcsLuggageStoreBuilder CreateIfMissing(this GcsLuggageStoreBuilder builder) + { + return builder.SetStrategy(StorageStrategy.CreateIfMissing); + } + + /// + /// Sets the storage strategy to validate that the GCS bucket exists. + /// + /// The GCS luggage store builder instance + /// The GCS luggage store builder instance for method chaining + public static GcsLuggageStoreBuilder ValidIfGcsExists(this GcsLuggageStoreBuilder builder) + { + return builder.SetStrategy(StorageStrategy.Validate); + } + + /// + /// Sets the storage strategy to assume the GCS bucket exists (no validation). + /// + /// The GCS luggage store builder instance + /// The GCS luggage store builder instance for method chaining + public static GcsLuggageStoreBuilder AssumeGcsExists(this GcsLuggageStoreBuilder builder) + { + return builder.SetStrategy(StorageStrategy.Assume); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/LuggageStoreBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/LuggageStoreBuilderExtensions.cs new file mode 100644 index 0000000..c5e5906 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/LuggageStoreBuilderExtensions.cs @@ -0,0 +1,39 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter.Transformers.Gcp; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for configuring Google Cloud Storage as a luggage store with the . +/// +public static class LuggageStoreBuilderExtensions +{ + /// + /// Configures the luggage store to use Google Cloud Storage (GCS) with a builder pattern. + /// + /// The luggage store builder instance. + /// An action to configure the . + /// The luggage store builder for method chaining. + public static LuggageStoreBuilder UseGcsLuggageStore(this LuggageStoreBuilder builder, + Action configure) + { + var store = new GcsLuggageStoreBuilder(); + configure(store); + return builder; + } + + /// + /// Configures the luggage store to use a pre-configured Google Cloud Storage (GCS) luggage store instance. + /// + /// The luggage store builder instance. + /// The configured instance. + /// The luggage store builder for method chaining. + public static LuggageStoreBuilder UseGcsLuggageStore(this LuggageStoreBuilder builder, GcsLuggageStore store) + { + builder.UseLuggageStore(store); + return builder; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/ProducerExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/ProducerExtensions.cs new file mode 100644 index 0000000..bfa8e58 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/ProducerExtensions.cs @@ -0,0 +1,192 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +using Paramore.Brighter; +using Paramore.Brighter.Firestore; +using Paramore.Brighter.Locking.Firestore; +using Paramore.Brighter.Outbox.Firestore; + +namespace Fluent.Brighter; + +/// +/// Extension methods for to configure Google Cloud Platform components. +/// +public static class ProducerExtensions +{ + /// + /// Adds Google Cloud Pub/Sub publication support to the producer. + /// + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// + /// builder.AddGcpPubSubPublication(pub => pub + /// .SetConnection(conn => conn.SetProjectId("my-project")) + /// .SetPublication(new GcpPublication { Topic = "my-topic" })); + /// + /// + public static ProducerBuilder AddGcpPubSubPublication(this ProducerBuilder builder, + Action configure) + { + var factory = new GcpPublicationFactoryBuilder(); + configure(factory); + return builder.AddMessageProducerFactory(factory.Build()); + } + + + #region Outbox + + #region Firestore + + /// + /// Configures the producer to use Google Cloud Firestore as the outbox for storing outgoing messages. + /// + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// This automatically sets up the Firestore connection provider and unit of work for transactional outbox support. + /// + /// + /// + /// builder.UseFirestoreOutbox(outbox => outbox + /// .SetConfiguration(firestoreConfig)); + /// + /// + public static ProducerBuilder UseFirestoreOutbox(this ProducerBuilder builder, + Action configure) + { + var outbox = new FirestoreOutboxBuilder(); + configure(outbox); + return builder.UseFirestoreOutbox(outbox.Build()); + } + + /// + /// Configures the producer to use a pre-built Google Cloud Firestore outbox instance. + /// + /// The instance. + /// The instance to use. + /// The instance for method chaining. + /// + /// This automatically sets up the Firestore connection provider and unit of work for transactional outbox support. + /// + public static ProducerBuilder UseFirestoreOutbox(this ProducerBuilder builder, FirestoreOutbox outbox) + { + return builder.SetOutbox(outbox) + .SetConnectionProvider(typeof(FirestoreConnectionProvider)) + .SetTransactionProvider(typeof(FirestoreUnitOfWork)); + } + + #endregion + + #region Spanner + + /// + /// Configures the producer to use Google Cloud Spanner as the outbox with relational database configuration. + /// + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// This overload provides a simplified configuration approach for Spanner outbox using relational database settings. + /// + /// + /// + /// builder.UseSpannerOutbox(config => config + /// .SetConnectionString("Data Source=projects/my-project/instances/my-instance/databases/my-database")); + /// + /// + public static ProducerBuilder UseSpannerOutbox(this ProducerBuilder builder, + Action configure) + { + var configuration = new RelationalDatabaseConfigurationBuilder(); + configure(configuration); + return builder.UseSpannerOutbox(configuration.Build()); + } + + /// + /// Configures the producer to use Google Cloud Spanner as the outbox with a pre-built configuration. + /// + /// The instance. + /// The for Spanner. + /// The instance for method chaining. + public static ProducerBuilder UseSpannerOutbox(this ProducerBuilder builder, + RelationalDatabaseConfiguration configuration) + { + return builder.UseSpannerOutbox(cfg => cfg.SetConfiguration(configuration)); + } + + /// + /// Configures the producer to use Google Cloud Spanner as the outbox with detailed builder configuration. + /// + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// This is the most flexible overload, allowing full control over Spanner outbox configuration. + /// + /// + /// + /// builder.UseSpannerOutbox(outbox => outbox + /// .SetConfiguration(spannerConfig)); + /// + /// + public static ProducerBuilder UseSpannerOutbox(this ProducerBuilder builder, + Action configuration) + { + var outbox = new SpannerOutboxBuilder(); + configuration(outbox); + builder.SetOutbox(outbox.Build()); + return builder; + } + + #endregion + + #endregion + + + #region Distributed lock + + /// + /// Configures the producer to use Google Cloud Firestore as the distributed lock provider. + /// + /// The instance. + /// An action to configure the . + /// The instance for method chaining. + /// + /// Distributed locks are used to coordinate access to shared resources across multiple application instances. + /// Firestore's transactional guarantees ensure that only one process can acquire a lock at a time. + /// + /// + /// + /// builder.UseFirestoreDistributedLock(lock => lock + /// .SetConfiguration(firestoreConfig)); + /// + /// + public static ProducerBuilder UseFirestoreDistributedLock(this ProducerBuilder builder, + Action configure) + { + var locking = new FirestoreLockingBuilder(); + configure(locking); + return builder.UseFirestoreDistributedLock(locking.Build()); + } + + /// + /// Configures the producer to use a pre-built Firestore distributed lock instance. + /// + /// The instance. + /// The instance to use. + /// The instance for method chaining. + /// + /// Use this overload when you have already created and configured a instance. + /// + public static ProducerBuilder UseFirestoreDistributedLock(this ProducerBuilder builder, FirestoreDistributedLock locking) + { + return builder.SetDistributedLock(locking); + } + + #endregion + +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/SpannerInboxBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/SpannerInboxBuilderExtensions.cs new file mode 100644 index 0000000..8a6e37b --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/SpannerInboxBuilderExtensions.cs @@ -0,0 +1,25 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for configuring Spanner inbox functionality. +/// +public static class SpannerInboxBuilderExtensions +{ + /// + /// Configures the Spanner inbox using a fluent configuration builder. + /// + /// The Spanner inbox builder instance. + /// An action to configure the relational database settings. + /// The configured instance for method chaining. + public static SpannerInboxBuilder SetConfiguration(this SpannerInboxBuilder builder, + Action configure) + { + var configuration = new RelationalDatabaseConfigurationBuilder(); + configure(configuration); + return builder.SetConfiguration(configuration.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Extensions/SpannerOutboxBuilderExtensions.cs b/src/Fluent.Brighter.GoogleCloud/Extensions/SpannerOutboxBuilderExtensions.cs new file mode 100644 index 0000000..fbfd428 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Extensions/SpannerOutboxBuilderExtensions.cs @@ -0,0 +1,25 @@ +using System; + +using Fluent.Brighter.GoogleCloud; + +namespace Fluent.Brighter; + +/// +/// Provides extension methods for configuring Spanner outbox functionality. +/// +public static class SpannerOutboxBuilderExtensions +{ + /// + /// Configures the Spanner outbox using a fluent configuration builder. + /// + /// The Spanner outbox builder instance. + /// An action to configure the relational database settings. + /// The configured instance for method chaining. + public static SpannerOutboxBuilder SetConfiguration(this SpannerOutboxBuilder builder, + Action configure) + { + var configuration = new RelationalDatabaseConfigurationBuilder(); + configure(configuration); + return builder.SetConfiguration(configuration.Build()); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/FirestoreCollectionBuilder.cs b/src/Fluent.Brighter.GoogleCloud/FirestoreCollectionBuilder.cs new file mode 100644 index 0000000..3f0fcd4 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/FirestoreCollectionBuilder.cs @@ -0,0 +1,53 @@ +using System; + +using Paramore.Brighter.Firestore; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Firestore collection. +/// Provides methods to set collection name and TTL (Time To Live) settings. +/// +public sealed class FirestoreCollectionBuilder +{ + private string _name = string.Empty; + + /// + /// Sets the name of the Firestore collection. + /// + /// The collection name + /// The builder instance for method chaining + public FirestoreCollectionBuilder SetName(string name) + { + _name = name; + return this; + } + + private TimeSpan? _ttl; + + /// + /// Sets the TTL (Time To Live) for documents in the Firestore collection. + /// Determines how long documents should be retained before automatic deletion. + /// + /// The time to live duration + /// The builder instance for method chaining + public FirestoreCollectionBuilder SetTtl(TimeSpan? ttl) + { + _ttl = ttl; + return this; + } + + /// + /// Builds the FirestoreCollection instance with the configured options. + /// + /// A configured instance + public FirestoreCollection Build() + { + return new FirestoreCollection + { + Name = _name, + Ttl = _ttl + }; + } +} + diff --git a/src/Fluent.Brighter.GoogleCloud/FirestoreConfigurationBuilder.cs b/src/Fluent.Brighter.GoogleCloud/FirestoreConfigurationBuilder.cs new file mode 100644 index 0000000..c11965a --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/FirestoreConfigurationBuilder.cs @@ -0,0 +1,168 @@ +using System; + +using Google.Apis.Auth.OAuth2; +using Google.Cloud.Firestore.V1; + +using Paramore.Brighter; +using Paramore.Brighter.Firestore; +using Paramore.Brighter.Observability; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Firestore configuration for Paramore.Brighter. +/// Provides methods to set project ID, database, collections, credentials, and other Firestore settings. +/// +public sealed class FirestoreConfigurationBuilder +{ + private string? _projectId; + + /// + /// Sets the Google Cloud project ID. + /// + /// The Google Cloud project ID + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetProjectId(string projectId) + { + _projectId = projectId; + return this; + } + + private string? _database; + + /// + /// Sets the Firestore database ID (e.g., "(default)"). + /// + /// The Firestore database ID + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetDatabase(string database) + { + _database = database; + return this; + } + + private FirestoreCollection? _inbox; + + /// + /// Sets the default inbox Firestore collection. + /// + /// The inbox Firestore collection + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetInbox(FirestoreCollection inbox) + { + _inbox = inbox; + return this; + } + + private FirestoreCollection? _outbox; + + /// + /// Sets the default outbox Firestore collection. + /// + /// The outbox Firestore collection + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetOutbox(FirestoreCollection outbox) + { + _outbox = outbox; + return this; + } + + private FirestoreCollection? _locking; + + /// + /// Sets the default locking Firestore collection. + /// + /// The locking Firestore collection + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetLocking(FirestoreCollection locking) + { + _locking = locking; + return this; + } + + private TimeProvider _timeProvider = TimeProvider.System; + + /// + /// Sets the to use for timestamp generation. + /// Defaults to . + /// + /// The time provider + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetTimeProvider(TimeProvider timeProvider) + { + _timeProvider = timeProvider; + return this; + } + + private ICredential? _credential; + + /// + /// Sets the Google credential to use for authentication. + /// If not set, Application Default Credentials will be used. + /// + /// The Google credential + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetCredential(ICredential? credential) + { + _credential = credential; + return this; + } + + private InstrumentationOptions _instrumentation = InstrumentationOptions.All; + + /// + /// Sets the instrumentation options for tracing. + /// + /// The instrumentation options + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetInstrumentation(InstrumentationOptions instrumentation) + { + _instrumentation = instrumentation; + return this; + } + + private Action? _configure; + + /// + /// Sets an action to configure the + /// before building the . This allows for advanced + /// customization of the client. + /// + /// The configuration action + /// The builder instance for method chaining + public FirestoreConfigurationBuilder SetConfigure(Action configure) + { + _configure = configure; + return this; + } + + /// + /// Builds the FirestoreConfiguration instance with the configured options. + /// + /// A configured instance + /// Thrown if project ID or database is not set + internal FirestoreConfiguration Build() + { + if (string.IsNullOrEmpty(_projectId)) + { + throw new ConfigurationException("Project ID is null or empty"); + } + + if (string.IsNullOrEmpty(_database)) + { + throw new ConfigurationException("Database is null or empty"); + } + + return new FirestoreConfiguration(_projectId!, _database!) + { + Inbox = _inbox, + Outbox = _outbox, + Locking = _locking, + TimeProvider = _timeProvider, + Credential = _credential, + Instrumentation = _instrumentation, + Configure = _configure + }; + } +} + diff --git a/src/Fluent.Brighter.GoogleCloud/FirestoreInboxBuilder.cs b/src/Fluent.Brighter.GoogleCloud/FirestoreInboxBuilder.cs new file mode 100644 index 0000000..f096402 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/FirestoreInboxBuilder.cs @@ -0,0 +1,57 @@ +using Paramore.Brighter; +using Paramore.Brighter.Firestore; +using Paramore.Brighter.Inbox.Firestore; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Firestore inbox for Paramore.Brighter. +/// Provides methods to set Firestore configuration and connection provider for inbox operations. +/// +public sealed class FirestoreInboxBuilder +{ + private FirestoreConfiguration? _configuration; + + /// + /// Sets the Firestore configuration containing project ID, database, and collection settings. + /// + /// The Firestore configuration + /// The builder instance for method chaining + public FirestoreInboxBuilder SetConfiguration(FirestoreConfiguration configuration) + { + _configuration = configuration; + return this; + } + + + private IAmAFirestoreConnectionProvider? _firestoreConnectionProvider; + + /// + /// Sets the Firestore connection provider for managing database connections. + /// If not set, a default will be created using the configuration. + /// + /// The Firestore connection provider + /// The builder instance for method chaining + public FirestoreInboxBuilder SetConnectionProvider(IAmAFirestoreConnectionProvider connectionProvider) + { + _firestoreConnectionProvider = connectionProvider; + return this; + } + + + /// + /// Builds the FirestoreInbox instance with the configured options. + /// + /// A configured instance + /// Thrown if configuration is not set + internal FirestoreInbox Build() + { + if (_configuration == null) + { + throw new ConfigurationException("Firestore configuration is required"); + } + + _firestoreConnectionProvider ??= new FirestoreConnectionProvider(_configuration); + return new FirestoreInbox(_firestoreConnectionProvider, _configuration); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/FirestoreLockingBuilder.cs b/src/Fluent.Brighter.GoogleCloud/FirestoreLockingBuilder.cs new file mode 100644 index 0000000..0c50737 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/FirestoreLockingBuilder.cs @@ -0,0 +1,55 @@ +using System; + +using Paramore.Brighter.Firestore; +using Paramore.Brighter.Locking.Firestore; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for creating instances of . +/// +public class FirestoreLockingBuilder +{ + private IAmAFirestoreConnectionProvider? _connectionProvider; + + /// + /// Sets the Firestore connection provider. + /// + /// The connection provider to use for Firestore operations. + /// The builder instance for method chaining. + public FirestoreLockingBuilder SetConnectionProvider(IAmAFirestoreConnectionProvider connectionProvider) + { + _connectionProvider = connectionProvider; + return this; + } + + private FirestoreConfiguration? _configuration; + + /// + /// Sets the Firestore configuration. + /// + /// The configuration settings for connecting to Firestore. + /// The builder instance for method chaining. + public FirestoreLockingBuilder SetConfiguration(FirestoreConfiguration configuration) + { + _configuration = configuration; + return this; + } + + /// + /// Builds a new instance of using the configured settings. + /// + /// A configured instance. + /// Thrown when configuration is not set. + internal FirestoreDistributedLock Build() + { + if (_configuration == null) + { + throw new InvalidOperationException("Configuration must be set before building."); + } + + return _connectionProvider != null + ? new FirestoreDistributedLock(_connectionProvider, _configuration) + : new FirestoreDistributedLock(_configuration); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/FirestoreOutboxBuilder.cs b/src/Fluent.Brighter.GoogleCloud/FirestoreOutboxBuilder.cs new file mode 100644 index 0000000..7769151 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/FirestoreOutboxBuilder.cs @@ -0,0 +1,56 @@ +using Paramore.Brighter; +using Paramore.Brighter.Firestore; +using Paramore.Brighter.Outbox.Firestore; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Firestore outbox for Paramore.Brighter. +/// Provides methods to set Firestore configuration and connection provider for outbox operations. +/// +public sealed class FirestoreOutboxBuilder +{ + private FirestoreConfiguration? _configuration; + + /// + /// Sets the Firestore configuration containing project ID, database, and collection settings. + /// + /// The Firestore configuration + /// The builder instance for method chaining + public FirestoreOutboxBuilder SetConfiguration(FirestoreConfiguration configuration) + { + _configuration = configuration; + return this; + } + + + private IAmAFirestoreConnectionProvider? _firestoreConnectionProvider; + + /// + /// Sets the Firestore connection provider for managing database connections. + /// If not set, a default will be created using the configuration. + /// + /// The Firestore connection provider + /// The builder instance for method chaining + public FirestoreOutboxBuilder SetConnectionProvider(IAmAFirestoreConnectionProvider connectionProvider) + { + _firestoreConnectionProvider = connectionProvider; + return this; + } + + /// + /// Builds the FirestoreOutbox instance with the configured options. + /// + /// A configured instance + /// Thrown if configuration is not set + internal FirestoreOutbox Build() + { + if (_configuration == null) + { + throw new ConfigurationException("Firestore configuration is required"); + } + + _firestoreConnectionProvider ??= new FirestoreConnectionProvider(_configuration); + return new FirestoreOutbox(_firestoreConnectionProvider, _configuration); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/Fluent.Brighter.GoogleCloud.csproj b/src/Fluent.Brighter.GoogleCloud/Fluent.Brighter.GoogleCloud.csproj new file mode 100644 index 0000000..18620f4 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/Fluent.Brighter.GoogleCloud.csproj @@ -0,0 +1,26 @@ + + + + $(BrighterTargetFrameworks) + enable + true + Fluent Brighter Google Cloud Platform Integration + A fluent configuration builder for Paramore.Brighter's Google Cloud Platform integration. Provides type-safe, fluent APIs for configuring GCP Pub/Sub messaging, Firestore and Spanner inbox/outbox patterns, distributed locking, and GCS luggage store with Brighter's command processor and message dispatching patterns. + Brighter;GCP;GoogleCloud;PubSub;Firestore;Spanner;CloudStorage;Messaging;MessageBus;Command;Event;Dispatcher;Fluent;Configuration;Builder;Producer;Consumer;Microservices;Outbox;Inbox;DistributedLocking;CloudEvents + + + + + + + + + + + + + + + + + diff --git a/src/Fluent.Brighter.GoogleCloud/GcpConfigurator.cs b/src/Fluent.Brighter.GoogleCloud/GcpConfigurator.cs new file mode 100644 index 0000000..01ea2b1 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/GcpConfigurator.cs @@ -0,0 +1,561 @@ +using System; + +using Microsoft.Extensions.DependencyInjection; + +using Paramore.Brighter; +using Paramore.Brighter.Firestore; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Central configuration class for setting up Google Cloud Platform services in Paramore.Brighter. +/// Provides fluent methods to configure Pub/Sub for message publication and subscription, +/// Firestore and Spanner for inbox/outbox patterns, distributed locking, and GCS-based luggage storage. +/// +public sealed class GcpConfigurator +{ + private GcpMessagingGatewayConnection? _connection; + private FirestoreConfiguration? _firestoreConfiguration; + private Action _action = _ => { }; + + /// + /// Sets the GCP messaging gateway connection. + /// + /// The GCP messaging gateway connection. + /// The configurator instance for method chaining. + public GcpConfigurator SetConnection(GcpMessagingGatewayConnection connection) + { + _connection = connection; + return this; + } + + /// + /// Sets the GCP messaging gateway connection using a builder. + /// + /// The action to configure the GCP messaging gateway connection. + /// The configurator instance for method chaining. + public GcpConfigurator SetConnection(Action configure) + { + var connection = new GcpMessagingGatewayConnectionBuilder(); + configure(connection); + _connection = connection.Build(); + return this; + } + + #region Pub/Sub Publication + + /// + /// Configures Google Cloud Pub/Sub for message publication. + /// Allows configuration of topic attributes, publisher client settings, and CloudEvents metadata. + /// + /// Action to configure Pub/Sub publication settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UsePubSubPublication(pub => + /// { + /// pub.AddPublication<MyCommand>(cfg => + /// { + /// cfg.SetTopicAttributes(attrs => attrs.SetName("my-topic")) + /// .SetSource("https://example.com"); + /// }); + /// }); + /// + /// + public GcpConfigurator UsePubSubPublication(Action configure) + { + _action += fluent => fluent + .Producers(producer => producer.AddGcpPubSubPublication(cfg => + { + cfg.SetConnection(_connection!); + configure(cfg); + })); + + return this; + } + + #endregion + + #region Pub/Sub Subscription + + /// + /// Configures Google Cloud Pub/Sub subscriptions for message consumption. + /// Allows configuration of subscription properties including acknowledgment deadlines, + /// message ordering, dead letter policies, and streaming vs. pull mode. + /// + /// Action to configure Pub/Sub subscription settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UsePubSubSubscription(sub => + /// { + /// sub.AddSubscription<MyCommand>(cfg => + /// { + /// cfg.SetAckDeadlineSeconds(60) + /// .SetNoOfPerformers(5) + /// .EnableMessageOrdering(); + /// }); + /// }); + /// + /// + public GcpConfigurator UsePubSubSubscription(Action configure) + { + _action += fluent => fluent + .Subscriptions(sub => + { + var channel = new GcpPubSubChannelFactory(_connection!); + var configurator = new PubSubSubscriptionConfigurator(channel); + configure(configurator); + + sub.AddChannelFactory(channel); + foreach (var subscription in configurator.Subscriptions) + { + sub.AddSubscription(subscription); + } + }); + + return this; + } + + #endregion + + /// + /// Sets the Firestore configuration. + /// + /// The Firestore database name. + /// The configurator instance for method chaining. + /// Thrown when the Google Cloud Project ID is not set. + public GcpConfigurator SetFirestoreConfiguration(string database) + { + if (_connection == null) + { + throw new ConfigurationException("Google Cloud Project ID was not set. Use SetConfiguration() to configure."); + } + + _firestoreConfiguration = new FirestoreConfiguration(_connection.ProjectId, database) + { + Credential = _connection.Credential + }; + + return this; + } + + /// + /// Sets the Firestore configuration using a builder. + /// + /// The action to configure the Firestore configuration. + /// The configurator instance for method chaining. + /// Thrown when the Google Cloud Project ID is not set. + public GcpConfigurator SetFirestoreConfiguration(Action configure) + { + if (_connection == null) + { + throw new ConfigurationException("Google Cloud Project ID was not set. Use SetConfiguration() to configure."); + } + + var builder = new FirestoreConfigurationBuilder() + .SetProjectId(_connection.ProjectId) + .SetCredential(_connection.Credential); + configure(builder); + + _firestoreConfiguration = builder.Build(); + return this; + } + + /// + /// Sets the Firestore configuration. + /// + /// The Firestore configuration. + /// The configurator instance for method chaining. + public GcpConfigurator SetFirestoreConfiguration(FirestoreConfiguration configuration) + { + _firestoreConfiguration = configuration; + return this; + } + + #region Firestore Inbox + + /// + /// Configures Firestore as the inbox store using default settings. + /// The inbox pattern ensures idempotent message processing by tracking received messages. + /// + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreInbox(string tableName) + { + return UseFirestoreInbox(cfg => cfg.SetName(tableName)); + } + + /// + /// Configures Firestore as the inbox store with custom configuration. + /// The inbox pattern ensures idempotent message processing by tracking received messages. + /// + /// Action to configure Firestore inbox settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UseFirestoreInbox(cfg => + /// { + /// cfg.SetConfiguration(config => + /// { + /// config.SetProjectId("my-project") + /// .SetDatabase("(default)") + /// .SetInbox(inbox => inbox.SetName("inbox-collection")); + /// }); + /// }); + /// + /// + public GcpConfigurator UseFirestoreInbox(Action configure) + { + _action += fluent => fluent + .Subscriptions(sub => sub.UseFirestoreInbox(cfg => + { + cfg.SetConfiguration(c => c + .SetCredential(_firestoreConfiguration!.Credential) + .SetProjectId(_firestoreConfiguration!.ProjectId) + .SetDatabase(_firestoreConfiguration!.Database)); + configure(cfg); + })); + + return this; + } + + /// + /// Configures Firestore as the inbox store with custom configuration. + /// The inbox pattern ensures idempotent message processing by tracking received messages. + /// + /// Action to configure Firestore inbox settings + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreInbox(Action configure) + { + _action += fluent => fluent + .Subscriptions(sub => sub.UseFirestoreInbox(cfg => + { + cfg.SetConfiguration(c => c + .SetCredential(_firestoreConfiguration!.Credential) + .SetProjectId(_firestoreConfiguration!.ProjectId) + .SetDatabase(_firestoreConfiguration!.Database) + .SetInbox(configure)); + })); + + return this; + } + + #endregion + + #region Firestore Outbox + + /// + /// Configures Firestore as the outbox store using default settings. + /// The outbox pattern ensures reliable message publishing by storing messages before sending. + /// + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreOutbox(string tableName) + { + return UseFirestoreOutbox(cfg => cfg.SetName(tableName)); + } + + /// + /// Configures Firestore as the outbox store with custom configuration. + /// The outbox pattern ensures reliable message publishing by storing messages before sending. + /// + /// Action to configure Firestore outbox settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UseFirestoreOutbox(cfg => + /// { + /// cfg.SetConfiguration(config => + /// { + /// config.SetProjectId("my-project") + /// .SetDatabase("(default)") + /// .SetOutbox(outbox => outbox.SetName("outbox-collection")); + /// }); + /// }); + /// + /// + public GcpConfigurator UseFirestoreOutbox(Action configure) + { + _action += fluent => fluent + .Producers(prod => prod.UseFirestoreOutbox(cfg => + { + cfg.SetConfiguration(c => c + .SetCredential(_firestoreConfiguration!.Credential) + .SetProjectId(_firestoreConfiguration!.ProjectId) + .SetDatabase(_firestoreConfiguration!.Database)); + configure(cfg); + })); + + return this; + } + + /// + /// Configures Firestore as the outbox store with custom configuration. + /// The outbox pattern ensures reliable message publishing by storing messages before sending. + /// + /// Action to configure Firestore outbox settings + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreOutbox(Action configure) + { + _action += fluent => fluent + .Producers(prod => prod.UseFirestoreOutbox(cfg => + { + cfg.SetConfiguration(c => c + .SetCredential(_firestoreConfiguration!.Credential) + .SetProjectId(_firestoreConfiguration!.ProjectId) + .SetDatabase(_firestoreConfiguration!.Database) + .SetOutbox(configure)); + })); + + return this; + } + + #endregion + + #region Spanner Inbox + + /// + /// Configures Cloud Spanner as the inbox store with custom configuration. + /// The inbox pattern ensures idempotent message processing by tracking received messages. + /// Cloud Spanner provides strong consistency and high availability for the inbox. + /// + /// Action to configure Spanner inbox settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UseSpannerInbox(cfg => + /// { + /// cfg.SetConfiguration(config => + /// { + /// config.SetTableName("Inbox") + /// .SetContextKey("SpannerConnection"); + /// }); + /// }); + /// + /// + public GcpConfigurator UseSpannerInbox(Action configure) + { + _action += fluent => fluent + .Subscriptions(sub => sub.UseSpannerInbox(configure)); + + return this; + } + + #endregion + + #region Spanner Outbox + + /// + /// Configures Cloud Spanner as the outbox store with custom configuration. + /// The outbox pattern ensures reliable message publishing by storing messages before sending. + /// Cloud Spanner provides strong consistency and high availability for the outbox. + /// + /// Action to configure Spanner outbox settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UseSpannerOutbox(cfg => + /// { + /// cfg.SetConfiguration(config => + /// { + /// config.SetTableName("Outbox") + /// .SetContextKey("SpannerConnection"); + /// }); + /// }); + /// + /// + public GcpConfigurator UseSpannerOutbox(Action configure) + { + _action += fluent => fluent + .Producers(prod => prod.UseSpannerOutbox(configure)); + + return this; + } + + #endregion + + #region Firestore Distributed Lock + + /// + /// Configures Firestore for distributed locking with default settings. + /// Distributed locks prevent concurrent processing of the same message across multiple instances. + /// + /// The configurator instance for method chaining + /// + /// Configures Firestore for distributed locking with default settings. + /// Distributed locks prevent concurrent processing of the same message across multiple instances. + /// + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreDistributedLock(string tableName) + { + return UseFirestoreDistributedLock(cfg => cfg.SetName(tableName)); + } + + /// + /// Configures Firestore for distributed locking with custom configuration. + /// Distributed locks prevent concurrent processing of the same message across multiple instances. + /// + /// Action to configure Firestore distributed locking settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UseFirestoreDistributedLock(cfg => + /// { + /// cfg.SetProjectId("my-project") + /// .SetDatabase("(default)") + /// .SetLocking(lock => lock.SetName("locks").SetTtl(TimeSpan.FromMinutes(5))); + /// }); + /// + /// + public GcpConfigurator UseFirestoreDistributedLock(Action configure) + { + _action += fluent => fluent + .Producers(prod => prod.UseFirestoreDistributedLock(cfg => + { + cfg.SetConfiguration(c => c + .SetProjectId(_firestoreConfiguration!.ProjectId) + .SetCredential(_firestoreConfiguration!.Credential) + .SetDatabase(_firestoreConfiguration!.Database)); + configure(cfg); + })); + + return this; + } + + /// + /// Configures Firestore for distributed locking with custom configuration. + /// Distributed locks prevent concurrent processing of the same message across multiple instances. + /// + /// Action to configure Firestore distributed locking settings + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreDistributedLock(Action configure) + { + _action += fluent => fluent + .Producers(prod => prod.UseFirestoreDistributedLock(cfg => + { + cfg.SetConfiguration(c => c + .SetProjectId(_firestoreConfiguration!.ProjectId) + .SetCredential(_firestoreConfiguration!.Credential) + .SetDatabase(_firestoreConfiguration!.Database) + .SetLocking(configure)); + })); + + return this; + } + + #endregion + + #region GCS Luggage Store + + /// + /// Configures Google Cloud Storage (GCS) as the luggage store with a specific bucket name. + /// The luggage store handles large message payloads by storing them in GCS and passing references. + /// + /// Name of the GCS bucket to use + /// The configurator instance for method chaining + public GcpConfigurator UseGcsLuggageStore(string bucketName) + { + return UseGcsLuggageStore(cfg => cfg.SetBucketName(bucketName)); + } + + /// + /// Configures Google Cloud Storage (GCS) as the luggage store with custom settings. + /// The luggage store handles large message payloads by storing them in GCS and passing references. + /// + /// Action to configure GCS luggage store settings + /// The configurator instance for method chaining + /// + /// + /// configurator.UseGcsLuggageStore(cfg => + /// { + /// cfg.SetProjectId("my-project") + /// .SetBucketName("message-luggage") + /// .SetCredential(GoogleCredential.GetApplicationDefault()); + /// }); + /// + /// + public GcpConfigurator UseGcsLuggageStore(Action configure) + { + _action += fluent => fluent + .SetLuggageStore(store => store.UseGcsLuggageStore(cfg => + { + cfg + .SetCredential(_connection!.Credential) + .SetProjectId(_connection.ProjectId); + configure(cfg); + })); + + return this; + } + + #endregion + + #region Outbox Archive + + /// + /// Configures Firestore as the outbox archive store using default settings. + /// The outbox archiver moves old messages from the outbox to an archive for long-term storage. + /// + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreOutboxArchive() + { + _action += static fluent => fluent.UseFirestoreTransactionOutboxArchive(); + return this; + } + + /// + /// Configures Firestore as the outbox archive store with custom settings. + /// The outbox archiver moves old messages from the outbox to an archive for long-term storage. + /// + /// Action to configure outbox archiver options + /// The configurator instance for method chaining + public GcpConfigurator UseFirestoreOutboxArchive(Action configure) + { + _action += fluent => fluent.UseFirestoreTransactionOutboxArchive(configure); + return this; + } + + /// + /// Configures Cloud Spanner as the outbox archive store using default settings. + /// The outbox archiver moves old messages from the outbox to an archive for long-term storage. + /// + /// The configurator instance for method chaining + public GcpConfigurator UseSpannerOutboxArchive() + { + _action += static fluent => fluent.UseSpannerTransactionOutboxArchive(); + return this; + } + + /// + /// Configures Cloud Spanner as the outbox archive store with custom settings. + /// The outbox archiver moves old messages from the outbox to an archive for long-term storage. + /// + /// Action to configure outbox archiver options + /// The configurator instance for method chaining + public GcpConfigurator UseSpannerOutboxArchive(Action configure) + { + _action += fluent => fluent.UseSpannerTransactionOutboxArchive(configure); + return this; + } + + #endregion + + /// + /// Internal method to apply all configured settings to the FluentBrighterBuilder. + /// + /// The FluentBrighterBuilder to configure + /// Thrown when project ID is not set + internal void SetFluentBrighter(FluentBrighterBuilder builder) + { + if (_connection == null) + { + throw new ConfigurationException("Google Cloud Project ID was not set. Use SetProjectId() to configure."); + } + + _action(builder); + if (_firestoreConfiguration != null) + { + builder.RegisterServices(s => s.AddSingleton(_firestoreConfiguration)); + } + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/GcpMessagingGatewayConnectionBuilder.cs b/src/Fluent.Brighter.GoogleCloud/GcpMessagingGatewayConnectionBuilder.cs new file mode 100644 index 0000000..1e39452 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/GcpMessagingGatewayConnectionBuilder.cs @@ -0,0 +1,130 @@ +using System; + +using Google.Apis.Auth.OAuth2; +using Google.Cloud.PubSub.V1; +using Google.Cloud.ResourceManager.V3; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for configuring and creating a . +/// Provides a fluent interface for setting up Google Cloud Pub/Sub gateway connections. +/// +public class GcpMessagingGatewayConnectionBuilder +{ + private ICredential? _credential; + + /// + /// Sets the to use for authentication with Google Cloud Pub/Sub. + /// + /// The credential to use for authentication. If not set, default Google credential resolution will be used. + /// The builder instance for method chaining. + public GcpMessagingGatewayConnectionBuilder SetCredential(ICredential credential) + { + _credential = credential; + return this; + } + + private string _projectId = string.Empty; + + /// + /// Sets the Google Cloud project ID. + /// + /// The Google Cloud project ID. This is required for most operations. + /// The builder instance for method chaining. + public GcpMessagingGatewayConnectionBuilder SetProjectId(string projectId) + { + _projectId = projectId; + return this; + } + + private Action? _topicManagerConfiguration; + + /// + /// Sets an action to configure the used for topic management. + /// + /// Action to configure the builder for topic management (create/update/delete topics). + /// The builder instance for method chaining. + public GcpMessagingGatewayConnectionBuilder SetTopicManagerConfiguration(Action configure) + { + _topicManagerConfiguration = configure; + return this; + } + + private Action? _publisherConfiguration; + + /// + /// Sets an action to configure the used to publish messages to a topic. + /// + /// Action to configure the builder for publishing messages. + /// The builder instance for method chaining. + public GcpMessagingGatewayConnectionBuilder SetPublisherConfiguration(Action configure) + { + _publisherConfiguration = configure; + return this; + } + + private Action? _subscriptionManagerConfiguration; + + /// + /// Sets an action to configure the used for subscription management. + /// + /// Action to configure the builder for pull mode and subscription management (create/update/delete subscription). + /// The builder instance for method chaining. + public GcpMessagingGatewayConnectionBuilder SetSubscriptionManagerConfiguration(Action configure) + { + _subscriptionManagerConfiguration = configure; + return this; + } + + private Action? _streamConfiguration; + + /// + /// Sets an action to configure the used for pull mode message consumption. + /// + /// Action to configure the builder for streaming message consumption. + /// The builder instance for method chaining. + public GcpMessagingGatewayConnectionBuilder SetStreamConfiguration(Action configure) + { + _streamConfiguration = configure; + return this; + } + + private Action? _projectsClientConfiguration; + + /// + /// Sets an action to configure the used for managing projects. + /// + /// Action to configure the builder for project-level operations. + /// The builder instance for method chaining. + public GcpMessagingGatewayConnectionBuilder SetProjectsClientConfiguration(Action configure) + { + _projectsClientConfiguration = configure; + return this; + } + + /// + /// Builds the with the configured settings. + /// + /// A new instance of . + /// Thrown if the project ID is not set. + internal GcpMessagingGatewayConnection Build() + { + if (string.IsNullOrEmpty(_projectId)) + { + throw new InvalidOperationException("Project ID must be set before building the connection."); + } + + return new GcpMessagingGatewayConnection + { + Credential = _credential, + ProjectId = _projectId, + TopicManagerConfiguration = _topicManagerConfiguration, + PublisherConfiguration = _publisherConfiguration, + SubscriptionManagerConfiguration = _subscriptionManagerConfiguration, + StreamConfiguration = _streamConfiguration, + ProjectsClientConfiguration = _projectsClientConfiguration + }; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/GcpPubSubSubscriptionBuilder.cs b/src/Fluent.Brighter.GoogleCloud/GcpPubSubSubscriptionBuilder.cs new file mode 100644 index 0000000..f95c218 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/GcpPubSubSubscriptionBuilder.cs @@ -0,0 +1,544 @@ +using System; + +using Google.Cloud.PubSub.V1; +using Google.Protobuf.Collections; +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +using DeadLetterPolicy = Paramore.Brighter.MessagingGateway.GcpPubSub.DeadLetterPolicy; +using SubscriptionName = Paramore.Brighter.SubscriptionName; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Google Cloud Pub/Sub subscription in Paramore.Brighter. +/// Provides methods to set various properties for message consumption from Google Cloud Pub/Sub, +/// including subscription naming, channel configuration, message handling behavior, +/// error handling, and GCP-specific features like dead letter policies and message ordering. +/// +public sealed class GcpPubSubSubscriptionBuilder +{ + private SubscriptionName? _subscriptionName; + + /// + /// Sets the name of the subscription, which uniquely identifies this consumer. + /// + /// The name of the subscription + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetSubscriptionName(SubscriptionName subscriptionName) + { + _subscriptionName = subscriptionName; + if (ChannelName.IsNullOrEmpty(_channelName)) + { + _channelName = new ChannelName(_subscriptionName.Value); + } + + return this; + } + + private ChannelName? _channelName; + + /// + /// Sets the channel name for the subscription. + /// In GCP Pub/Sub, this represents the subscription name in the platform. + /// + /// The name of the channel + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetSubscription(ChannelName channelName) + { + _channelName = channelName; + if (_subscriptionName == null) + { + _subscriptionName = new SubscriptionName(channelName.Value); + } + + return this; + } + + private RoutingKey? _routingKey; + + /// + /// Sets the topic/routing key for the subscription. + /// This is the Pub/Sub topic that the subscription will consume messages from. + /// + /// The topic/routing key + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetTopic(RoutingKey routingKey) + { + _routingKey = routingKey; + return this; + } + + private Type? _requestType; + + /// + /// Sets the .NET type of the request message being consumed and automatically + /// derives subscription name, channel name, and routing key if not explicitly set. + /// + /// The type of the message request + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetDataType(Type? requestType) + { + _requestType = requestType; + if (requestType == null) + { + return this; + } + + if (_subscriptionName == null) + { + _subscriptionName = new SubscriptionName(requestType.Name); + } + + if (ChannelName.IsNullOrEmpty(_channelName)) + { + _channelName = new ChannelName(requestType.Name); + } + + if (RoutingKey.IsNullOrEmpty(_routingKey)) + { + _routingKey = new RoutingKey(requestType.Name); + } + + return this; + } + + private Func? _getRequestType; + + /// + /// Sets a function to dynamically determine the request type from a message, + /// useful for polymorphic message handling. + /// + /// Function to extract the request type from a message + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetGetRequestType(Func? getRequestType) + { + _getRequestType = getRequestType; + return this; + } + + private int _bufferSize = 1; + + /// + /// Sets the buffer size for the message pump, which controls how many messages + /// are pre-fetched from the subscription. + /// + /// The number of messages to buffer + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetBufferSize(int bufferSize) + { + _bufferSize = bufferSize; + return this; + } + + private int _noOfPerformers = 1; + + /// + /// Sets the number of concurrent performers (threads/processes) that will + /// process messages from this subscription. + /// + /// The number of concurrent performers + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetNoOfPerformers(int noOfPerformers) + { + _noOfPerformers = noOfPerformers; + return this; + } + + private TimeSpan? _timeOut; + + /// + /// Sets the timeout for receive message operations from the Pub/Sub subscription. + /// + /// The receive message timeout + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetTimeOut(TimeSpan? timeOut) + { + _timeOut = timeOut; + return this; + } + + private int _requeueCount = -1; + + /// + /// Sets the maximum number of times a message can be requeued after failed processing. + /// A value of -1 indicates unlimited requeues. + /// + /// The maximum number of requeue attempts + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetRequeueCount(int requeueCount) + { + _requeueCount = requeueCount; + return this; + } + + private TimeSpan? _requeueDelay; + + /// + /// Sets the minimum delay before a requeued message becomes visible again for processing. + /// This works in conjunction with MaxRequeueDelay for exponential backoff. + /// + /// The minimum delay before requeued messages become visible + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetRequeueDelay(TimeSpan? requeueDelay) + { + _requeueDelay = requeueDelay; + return this; + } + + private int _unacceptableMessageLimit; + + /// + /// Sets the limit for consecutive unacceptable messages before the channel is terminated. + /// + /// The maximum number of unacceptable messages + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetUnacceptableMessageLimit(int unacceptableMessageLimit) + { + _unacceptableMessageLimit = unacceptableMessageLimit; + return this; + } + + private MessagePumpType _messagePumpType = MessagePumpType.Proactor; + + /// + /// Sets the message pump type (Proactor for async or Reactor for sync) which determines + /// how messages are processed. + /// + /// The message pump type + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetMessagePumpType(MessagePumpType messagePumpType) + { + _messagePumpType = messagePumpType; + return this; + } + + private IAmAChannelFactory? _channelFactory; + + /// + /// Sets a custom channel factory for creating channels to the message queue. + /// + /// The channel factory implementation + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetChannelFactory(IAmAChannelFactory? channelFactory) + { + _channelFactory = channelFactory; + return this; + } + + private OnMissingChannel _makeChannels = OnMissingChannel.Create; + + /// + /// Sets the channel creation behavior when a subscription/topic doesn't exist. + /// + /// Policy for channel creation (validate, create, or assume) + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetMakeChannels(OnMissingChannel makeChannels) + { + _makeChannels = makeChannels; + return this; + } + + private TimeSpan? _emptyChannelDelay; + + /// + /// Sets the delay when an empty channel is encountered before checking for new messages. + /// + /// The delay when channel is empty + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetEmptyChannelDelay(TimeSpan? emptyChannelDelay) + { + _emptyChannelDelay = emptyChannelDelay; + return this; + } + + private TimeSpan? _channelFailureDelay; + + /// + /// Sets the delay after a channel failure before attempting to reconnect. + /// + /// The delay after channel failure + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetChannelFailureDelay(TimeSpan? channelFailureDelay) + { + _channelFailureDelay = channelFailureDelay; + return this; + } + + private string? _projectId; + + /// + /// Sets the Google Cloud Project ID where the subscription and its topic reside. + /// If null, the default project ID from the connection will be used. + /// + /// The Google Cloud Project ID + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetProjectId(string? projectId) + { + _projectId = projectId; + return this; + } + + private TopicAttributes? _topicAttributes; + + /// + /// Sets the attributes for the associated Google Cloud Pub/Sub Topic. + /// This is used for Topic creation and configuration during infrastructure setup. + /// + /// The topic attributes configuration + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetTopicAttributes(TopicAttributes? topicAttributes) + { + _topicAttributes = topicAttributes; + return this; + } + + private int _ackDeadlineSeconds = 30; + + /// + /// Sets the acknowledgment deadline in seconds for the subscription. + /// This is the time the subscriber has to acknowledge a message before Pub/Sub redelivers it. + /// + /// The acknowledgment deadline in seconds + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetAckDeadlineSeconds(int ackDeadlineSeconds) + { + _ackDeadlineSeconds = ackDeadlineSeconds; + return this; + } + + private bool _retainAckedMessages; + + /// + /// Sets whether Pub/Sub should retain acknowledged messages. + /// This is typically used for replay functionality. + /// + /// True to retain acknowledged messages + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetRetainAckedMessages(bool retainAckedMessages) + { + _retainAckedMessages = retainAckedMessages; + return this; + } + + private TimeSpan? _messageRetentionDuration; + + /// + /// Sets the duration for which Pub/Sub retains messages published to the topic. + /// This setting is applied to the subscription during creation/update. + /// + /// The message retention duration + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetMessageRetentionDuration(TimeSpan? messageRetentionDuration) + { + _messageRetentionDuration = messageRetentionDuration; + return this; + } + + private MapField? _labels; + + /// + /// Sets a collection of key-value pairs that are attached to the subscription as labels. + /// Labels are used for organization, billing, and resource management. + /// + /// Dictionary of label key-value pairs + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetLabels(MapField? labels) + { + _labels = labels; + return this; + } + + /// + /// Adds a single label to the subscription. + /// + /// The label key + /// The label value + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder AddLabel(string key, string value) + { + _labels ??= new MapField(); + _labels[key] = value; + return this; + } + + private bool _enableMessageOrdering; + + /// + /// Sets whether messages published to the topic are delivered in the order they were published, + /// provided they were published with an ordering key. + /// + /// True to enable message ordering + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetEnableMessageOrdering(bool enableMessageOrdering) + { + _enableMessageOrdering = enableMessageOrdering; + return this; + } + + private bool _enableExactlyOnceDelivery; + + /// + /// Sets whether exactly-once delivery is enabled for the subscription. + /// + /// True to enable exactly-once delivery + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetEnableExactlyOnceDelivery(bool enableExactlyOnceDelivery) + { + _enableExactlyOnceDelivery = enableExactlyOnceDelivery; + return this; + } + + private CloudStorageConfig? _storage; + + /// + /// Sets the configuration for forwarding message snapshots to a Google Cloud Storage bucket. + /// This is used for data export/backup functionality. + /// + /// The cloud storage configuration + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetStorage(CloudStorageConfig? storage) + { + _storage = storage; + return this; + } + + private ExpirationPolicy? _expirationPolicy; + + /// + /// Sets the subscription's expiration policy. + /// If set, the subscription will automatically be deleted after a period of inactivity. + /// + /// The expiration policy + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetExpirationPolicy(ExpirationPolicy? expirationPolicy) + { + _expirationPolicy = expirationPolicy; + return this; + } + + private DeadLetterPolicy? _deadLetter; + + /// + /// Sets the configuration for the Dead Letter Policy (DLQ). + /// If set, messages that fail processing will be forwarded to a specified dead letter topic. + /// + /// The dead letter policy + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetDeadLetter(DeadLetterPolicy? deadLetter) + { + _deadLetter = deadLetter; + return this; + } + + private TimeSpan? _maxRequeueDelay; + + /// + /// Sets the maximum delay time for exponential backoff retry policy when a message is requeued. + /// This works in conjunction with RequeueDelay which is the minimum backoff. + /// + /// The maximum requeue delay + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetMaxRequeueDelay(TimeSpan? maxRequeueDelay) + { + _maxRequeueDelay = maxRequeueDelay; + return this; + } + + private TimeProvider? _timeProvider; + + /// + /// Sets the time provider used for time-related operations (e.g., purging). + /// + /// The time provider + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetTimeProvider(TimeProvider? timeProvider) + { + _timeProvider = timeProvider; + return this; + } + + private SubscriptionMode _subscriptionMode = SubscriptionMode.Stream; + + /// + /// Sets the message consumption mode: Stream (default) or Pull. + /// + /// The subscription mode + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetSubscriptionMode(SubscriptionMode subscriptionMode) + { + _subscriptionMode = subscriptionMode; + return this; + } + + private Action? _streamingConfiguration; + + /// + /// Sets an action to configure the SubscriberClientBuilder used for the streaming consumer. + /// This allows for advanced customization of the underlying streaming client configuration. + /// + /// Action to configure the SubscriberClientBuilder + /// The builder instance for method chaining + public GcpPubSubSubscriptionBuilder SetStreamingConfiguration(Action? streamingConfiguration) + { + _streamingConfiguration = streamingConfiguration; + return this; + } + + /// + /// Builds a GcpPubSubSubscription instance with the configured values. + /// + /// A configured GcpPubSubSubscription instance + /// Thrown when required configuration is missing + internal GcpPubSubSubscription Build() + { + if (_subscriptionName == null) + { + throw new ConfigurationException("Subscription name not set"); + } + + if (_channelName == null) + { + throw new ConfigurationException("Channel name not set"); + } + + if (_routingKey == null) + { + throw new ConfigurationException("Routing key not set"); + } + + return new GcpPubSubSubscription( + subscriptionName: _subscriptionName, + channelName: _channelName, + routingKey: _routingKey, + requestType: _requestType, + getRequestType: _getRequestType, + bufferSize: _bufferSize, + noOfPerformers: _noOfPerformers, + timeOut: _timeOut, + requeueCount: _requeueCount, + requeueDelay: _requeueDelay, + unacceptableMessageLimit: _unacceptableMessageLimit, + messagePumpType: _messagePumpType, + channelFactory: _channelFactory, + makeChannels: _makeChannels, + emptyChannelDelay: _emptyChannelDelay, + channelFailureDelay: _channelFailureDelay, + projectId: _projectId, + topicAttributes: _topicAttributes, + ackDeadlineSeconds: _ackDeadlineSeconds, + retainAckedMessages: _retainAckedMessages, + messageRetentionDuration: _messageRetentionDuration, + labels: _labels, + enableMessageOrdering: _enableMessageOrdering, + enableExactlyOnceDelivery: _enableExactlyOnceDelivery, + storage: _storage, + expirationPolicy: _expirationPolicy, + deadLetter: _deadLetter, + maxRequeueDelay: _maxRequeueDelay, + timeProvider: _timeProvider, + subscriptionMode: _subscriptionMode, + streamingConfiguration: _streamingConfiguration + ); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/GcpPublicationBuilder.cs b/src/Fluent.Brighter.GoogleCloud/GcpPublicationBuilder.cs new file mode 100644 index 0000000..5cf1b49 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/GcpPublicationBuilder.cs @@ -0,0 +1,217 @@ +using System; +using System.Collections.Generic; + +using Google.Cloud.PubSub.V1; +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Google Cloud Pub/Sub publication in Paramore.Brighter. +/// Provides methods to set topic attributes, publisher client configuration, data schema, +/// channel creation behavior, and message metadata. +/// +public sealed class GcpPublicationBuilder +{ + private TopicAttributes? _topicAttributes; + + /// + /// Sets the attributes for the Google Cloud Pub/Sub Topic. + /// This includes settings such as the ProjectId, Topic Name, and potentially more advanced configurations. + /// + /// The topic attributes configuration + /// The builder instance for method chaining + public GcpPublicationBuilder SetTopicAttributes(TopicAttributes? topicAttributes) + { + _topicAttributes = topicAttributes; + return this; + } + + private Action? _publisherClientConfiguration; + + /// + /// Sets an action to allow advanced customization of the PublisherClientBuilder. + /// This is used to configure the client that publishes messages to the topic, for scenarios like + /// setting custom client options, retries, or deadlines. + /// + /// Action to configure the PublisherClientBuilder + /// The builder instance for method chaining + public GcpPublicationBuilder SetPublisherClientConfiguration(Action? publisherClientConfiguration) + { + _publisherClientConfiguration = publisherClientConfiguration; + return this; + } + + private Uri? _dataSchema; + + /// + /// Sets the URI of the data schema for CloudEvents metadata. + /// Identifies the schema that data adheres to. Incompatible changes to the schema should be reflected by a different URI. + /// + /// URI pointing to the event data schema + /// The builder instance for method chaining + public GcpPublicationBuilder SetDataSchema(Uri? dataSchema) + { + _dataSchema = dataSchema; + return this; + } + + private OnMissingChannel _makeChannels; + + /// + /// Sets what to do with infrastructure dependencies for the producer. + /// Controls whether to create, validate, or assume the existence of the topic. + /// + /// Policy for channel creation (validate, create, or assume) + /// The builder instance for method chaining + public GcpPublicationBuilder SetMakeChannels(OnMissingChannel makeChannels) + { + _makeChannels = makeChannels; + return this; + } + + private Type? _requestType; + + /// + /// Sets the type of the request that we expect to publish on this channel. + /// + /// The type of the message request + /// The builder instance for method chaining + public GcpPublicationBuilder SetRequestType(Type? requestType) + { + _requestType = requestType; + return this; + } + + private Uri _source = new Uri("http://goparamore.io"); + + /// + /// Sets the source URI for CloudEvents metadata. + /// Identifies the context in which an event happened. Often this will include information such as the type of + /// the event source, the organization publishing the event or the process that produced the event. + /// Producers MUST ensure that source + id is unique for each distinct event. + /// + /// URI identifying the event source + /// The builder instance for method chaining + public GcpPublicationBuilder SetSource(Uri source) + { + _source = source; + return this; + } + + private string? _subject; + + /// + /// Sets the subject of the event in the context of the event producer. + /// In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, + /// but the source identifier alone might not be sufficient as a qualifier for any specific event if the + /// source context has internal sub-structure. + /// + /// The event subject + /// The builder instance for method chaining + public GcpPublicationBuilder SetSubject(string? subject) + { + _subject = subject; + return this; + } + + private RoutingKey? _topic; + + /// + /// Sets the topic this publication is for defined by a RoutingKey. + /// In a pub-sub scenario there is typically a topic, to which we publish and then a subscriber creates its own + /// queue which the broker delivers messages to. + /// + /// The topic name or routing key + /// The builder instance for method chaining + public GcpPublicationBuilder SetTopic(RoutingKey topic) + { + _topic = topic; + return this; + } + + private CloudEventsType _type = CloudEventsType.Empty; + + /// + /// Sets the CloudEvents type metadata for message classification. + /// This attribute contains a value describing the type of event related to the originating occurrence. + /// Often this attribute is used for routing, observability, policy enforcement, etc. + /// Should be prefixed with a reverse-DNS name. + /// + /// The CloudEvents type specification + /// The builder instance for method chaining + public GcpPublicationBuilder SetType(CloudEventsType type) + { + _type = type; + return this; + } + + private IDictionary? _defaultHeaders; + + /// + /// Sets the default headers to be included in published messages when using default message mappers. + /// These headers will be automatically added to all messages published through Brighter's message producers. + /// + /// Dictionary of header names and values + /// The builder instance for method chaining + public GcpPublicationBuilder SetDefaultHeaders(IDictionary? defaultHeaders) + { + _defaultHeaders = defaultHeaders; + return this; + } + + private IDictionary? _cloudEventsAdditionalProperties; + + /// + /// Sets a dictionary of additional properties related to CloudEvents. + /// This property enables the inclusion of custom or vendor-specific metadata beyond the standard CloudEvents attributes. + /// These properties are serialized alongside the core CloudEvents attributes when mapping to a CloudEvent message. + /// + /// Extended CloudEvents properties + /// The builder instance for method chaining + public GcpPublicationBuilder SetCloudEventsAdditionalProperties( + IDictionary? cloudEventsAdditionalProperties) + { + _cloudEventsAdditionalProperties = cloudEventsAdditionalProperties; + return this; + } + + private string? _replyTo; + + /// + /// Sets the reply to topic. Used when doing Request-Reply instead of Publish-Subscribe to identify + /// the queue that the sender is listening on. Usually a sender listens on a private queue, so that they + /// do not have to filter replies intended for other listeners. + /// + /// The reply endpoint address + /// The builder instance for method chaining + public GcpPublicationBuilder SetReplyTo(string? replyTo) + { + _replyTo = replyTo; + return this; + } + + /// + /// Builds a GcpPublication instance with the configured values. + /// + /// A configured GcpPublication instance + internal GcpPublication Build() + { + return new GcpPublication + { + TopicAttributes = _topicAttributes, + PublisherClientConfiguration = _publisherClientConfiguration, + DataSchema = _dataSchema, + MakeChannels = _makeChannels, + RequestType = _requestType, + Source = _source, + Subject = _subject, + Topic = _topic, + Type = _type, + DefaultHeaders = _defaultHeaders, + CloudEventsAdditionalProperties = _cloudEventsAdditionalProperties, + ReplyTo = _replyTo + }; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/GcpPublicationFactoryBuilder.cs b/src/Fluent.Brighter.GoogleCloud/GcpPublicationFactoryBuilder.cs new file mode 100644 index 0000000..5093230 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/GcpPublicationFactoryBuilder.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +using Paramore.Brighter.MessagingGateway.GcpPubSub; +using Paramore.Brighter.Observability; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for configuring and creating a . +/// Provides a fluent interface for setting up Google Cloud Pub/Sub message producer configurations. +/// +public class GcpPublicationFactoryBuilder +{ + private GcpMessagingGatewayConnection? _connection; + + /// + /// Sets the connection details for the Google Cloud Pub/Sub gateway. + /// + /// The connection configuration including credentials and project settings. + /// The builder instance for method chaining. + public GcpPublicationFactoryBuilder SetConnection(GcpMessagingGatewayConnection connection) + { + _connection = connection; + return this; + } + + private List _publications = new(); + + /// + /// Sets the collection of Google Cloud Pub/Sub specific publication configurations. + /// Replaces any previously configured publications. + /// + /// The publications to configure. + /// The builder instance for method chaining. + public GcpPublicationFactoryBuilder SetPublications(params GcpPublication[] publications) + { + _publications = publications.ToList(); + return this; + } + + /// + /// Adds a single publication to the collection. + /// Can be called multiple times to add multiple publications. + /// + /// The publication to add. + /// The builder instance for method chaining. + public GcpPublicationFactoryBuilder AddPublication(GcpPublication publication) + { + _publications.Add(publication); + return this; + } + + private InstrumentationOptions? _instrumentation; + + /// + /// Sets the instrumentation options for tracing and metrics. + /// + /// The instrumentation options to enable observability features. + /// The builder instance for method chaining. + public GcpPublicationFactoryBuilder SetInstrumentation(InstrumentationOptions instrumentation) + { + _instrumentation = instrumentation; + return this; + } + + /// + /// Builds the with the configured settings. + /// + /// A new instance of . + /// Thrown if connection is not set. + internal GcpPubSubMessageProducerFactory Build() + { + if (_connection == null) + { + throw new InvalidOperationException("Connection must be set before building the factory."); + } + + return new GcpPubSubMessageProducerFactory(_connection, _publications, _instrumentation); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/GcsLuggageStoreBuilder.cs b/src/Fluent.Brighter.GoogleCloud/GcsLuggageStoreBuilder.cs new file mode 100644 index 0000000..cb44d54 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/GcsLuggageStoreBuilder.cs @@ -0,0 +1,229 @@ +using System; + +using Google.Apis.Auth.OAuth2; +using Google.Apis.Storage.v1.Data; +using Google.Cloud.Storage.V1; + +using Paramore.Brighter; +using Paramore.Brighter.Transformers.Gcp; +using Paramore.Brighter.Transforms.Storage; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring a Google Cloud Storage (GCS) luggage store for Paramore.Brighter's large message handling. +/// Provides methods to set credentials, bucket configuration, and storage options for storing large messages (luggage) in GCS. +/// +public sealed class GcsLuggageStoreBuilder +{ + private string _projectId = string.Empty; + + /// + /// Sets the Google Cloud project ID where the GCS bucket is located. + /// + /// The Google Cloud project ID + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetProjectId(string projectId) + { + _projectId = projectId; + return this; + } + + private Bucket _bucket = new(); + + /// + /// Sets the GCS bucket configuration object directly, allowing full customization of bucket properties. + /// + /// The GCS bucket configuration + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetBucket(Bucket bucket) + { + _bucket = bucket; + return this; + } + + + /// + /// Sets the name of the GCS bucket where luggage (large messages) will be stored. + /// + /// The name of the GCS bucket + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetBucketName(string bucketName) + { + _bucket.Name = bucketName; + return this; + } + + private ICredential? _credential; + + /// + /// Sets the Google Cloud credential to use for authentication when accessing the storage bucket. + /// + /// The Google Cloud credential + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetCredential(ICredential? credential) + { + _credential = credential; + return this; + } + + private DownloadObjectOptions? _downloadObjectOptions; + + /// + /// Sets options for downloading object content when calling or . + /// Controls download behavior including buffer size and cancellation tokens. + /// + /// + /// Affects stream buffering and progress tracking during payload retrieval operations. + /// + /// The download object options + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetDownloadObjectOptions(DownloadObjectOptions options) + { + _downloadObjectOptions = options; + return this; + } + + private UploadObjectOptions? _uploadObjectOptions; + + /// + /// Sets options for uploading objects when calling or . + /// Allows configuration of upload behavior including content encoding and predefined ACL settings. + /// + /// + /// Used in all operations to control upload resumability and metadata settings. + /// + /// The upload object options + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetUploadObjectOptions(UploadObjectOptions options) + { + _uploadObjectOptions = options; + return this; + } + + private CreateBucketOptions? _createBucketOptions; + + /// + /// Sets options for creating buckets when calling . + /// Controls bucket creation behavior including predefined ACLs and projection settings. + /// + /// The create bucket options + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetCreateBucket(CreateBucketOptions options) + { + _createBucketOptions = options; + return this; + } + + private GetBucketOptions? _getBucketOptions; + + /// + /// Sets options for retrieving bucket metadata when calling . + /// Controls what bucket information is returned. + /// + /// The get bucket options + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetGetBucketOptions(GetBucketOptions options) + { + _getBucketOptions = options; + return this; + } + + private DeleteObjectOptions? _deleteObjectOptions; + + /// + /// Sets options for deleting objects when calling . + /// Controls deletion behavior including generation matching for safe deletion. + /// + /// The delete object options + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetDeleteObjectOptions(DeleteObjectOptions options) + { + _deleteObjectOptions = options; + return this; + } + + private GetObjectOptions? _getObjectOptions; + + /// + /// Sets options for retrieving object metadata when calling . + /// Controls what object information is returned including generation and projection settings. + /// + /// The get object options + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetGetObjectOptions(GetObjectOptions options) + { + _getObjectOptions = options; + return this; + } + + private string _prefix = string.Empty; + + /// + /// Sets the prefix for luggage objects stored in GCS, which helps organize + /// and identify Brighter-specific objects in the bucket. + /// + /// The prefix for luggage objects + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetPrefix(string prefix) + { + _prefix = prefix; + return this; + } + + private StorageStrategy _strategy = StorageStrategy.Assume; + + /// + /// Sets the storage strategy for dealing with missing buckets (Assume, Validate, or Create). + /// + /// The storage strategy + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetStrategy(StorageStrategy strategy) + { + _strategy = strategy; + return this; + } + + private Action? _clientBuilderConfigurator; + + /// + /// Sets a callback to customize the before client creation. + /// Allows advanced configuration of retry policies, scopes, or endpoint settings. + /// + /// + /// builder.SetClientBuilderConfigurator(clientBuilder => + /// clientBuilder.Scopes = new[] { "https://www.googleapis.com/auth/cloud-platform" }); + /// + /// The configurator action + /// The builder instance for method chaining + public GcsLuggageStoreBuilder SetClientBuilderConfigurator(Action configurator) + { + _clientBuilderConfigurator = configurator; + return this; + } + + /// + /// Builds the GcsLuggageStore instance with the configured options. + /// + /// A configured instance + /// Thrown if bucket name is not set + internal GcsLuggageStore Build() + { + return new GcsLuggageStore( + new GcsLuggageOptions + { + Bucket = _bucket, + ClientBuilderConfigurator = _clientBuilderConfigurator, + Credential = _credential, + CreateBucketOptions = _createBucketOptions, + DeleteObjectOptions = _deleteObjectOptions, + DownloadObjectOptions = _downloadObjectOptions, + GetBucketOptions = _getBucketOptions, + GetObjectOptions = _getObjectOptions, + Prefix = _prefix, + ProjectId = _projectId, + UploadObjectOptions = _uploadObjectOptions, + Strategy = _strategy + }); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/PubSubSubscriptionConfigurator.cs b/src/Fluent.Brighter.GoogleCloud/PubSubSubscriptionConfigurator.cs new file mode 100644 index 0000000..9353275 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/PubSubSubscriptionConfigurator.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Generic; + +using Paramore.Brighter; +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Configurator class for managing multiple Google Cloud Pub/Sub subscriptions. +/// Provides methods to add and configure subscriptions using fluent API patterns. +/// +/// The GCP Pub/Sub channel factory used to create channels for all subscriptions +public class PubSubSubscriptionConfigurator(GcpPubSubChannelFactory channelFactory) +{ + /// + /// Gets the internal list of configured GCP Pub/Sub subscriptions. + /// + internal List Subscriptions { get; } = []; + + /// + /// Adds a pre-configured GCP Pub/Sub subscription to the configurator. + /// + /// The subscription instance to add + /// The configurator instance for method chaining + public PubSubSubscriptionConfigurator AddSubscription(GcpPubSubSubscription subscription) + { + Subscriptions.Add(subscription); + return this; + } + + /// + /// Adds a new GCP Pub/Sub subscription by configuring it using a fluent builder. + /// The channel factory is automatically set on the builder. + /// + /// An action to configure the subscription using the builder + /// The configurator instance for method chaining + /// + /// + /// configurator.AddSubscription(sub => + /// { + /// sub.SetChannelName(new ChannelName("my-subscription")) + /// .SetRoutingKey(new RoutingKey("my-topic")) + /// .SetAckDeadlineSeconds(60); + /// }); + /// + /// + public PubSubSubscriptionConfigurator AddSubscription(Action configure) + { + var sub = new GcpPubSubSubscriptionBuilder(); + sub.SetChannelFactory(channelFactory); + configure(sub); + return AddSubscription(sub.Build()); + } + + /// + /// Adds a new GCP Pub/Sub subscription for a specific request type using a fluent builder. + /// The data type is automatically set based on the generic parameter, and subscription name, + /// channel name, and routing key are derived from the type name if not explicitly configured. + /// The channel factory is automatically set on the builder. + /// + /// The type of request message this subscription will handle + /// An action to configure the subscription using the builder + /// The configurator instance for method chaining + /// + /// + /// configurator.AddSubscription<MyCommand>(sub => + /// { + /// sub.SetAckDeadlineSeconds(60) + /// .SetNoOfPerformers(5); + /// }); + /// + /// + public PubSubSubscriptionConfigurator AddSubscription(Action configure) + where TRequest : class, IRequest + { + return AddSubscription(cfg => + { + cfg.SetDataType(typeof(TRequest)); + configure(cfg); + }); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/SpannerInboxBuilder.cs b/src/Fluent.Brighter.GoogleCloud/SpannerInboxBuilder.cs new file mode 100644 index 0000000..d470908 --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/SpannerInboxBuilder.cs @@ -0,0 +1,53 @@ +using Paramore.Brighter; +using Paramore.Brighter.Inbox.Spanner; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for creating instances of . +/// Provides a fluent API for configuring Google Cloud Spanner inbox storage. +/// +public sealed class SpannerInboxBuilder +{ + private IAmARelationalDatabaseConfiguration? _configuration; + + /// + /// Sets the relational database configuration for the Spanner inbox. + /// + /// The database configuration containing connection settings and table names. + /// The builder instance for method chaining. + public SpannerInboxBuilder SetConfiguration(IAmARelationalDatabaseConfiguration configuration) + { + _configuration = configuration; + return this; + } + + private IAmARelationalDbConnectionProvider? _connectionProvider; + + /// + /// Sets the relational database connection provider for the Spanner inbox. + /// + /// The connection provider to use for managing database connections. + /// The builder instance for method chaining. + public SpannerInboxBuilder SetConnectionProvider(IAmARelationalDbConnectionProvider connectionProvider) + { + _connectionProvider = connectionProvider; + return this; + } + + /// + /// Builds a new instance of using the configured settings. + /// + /// A configured instance. + /// Thrown when configuration is not set. + internal SpannerInboxAsync Build() + { + if (_configuration == null) + { + throw new ConfigurationException("Configuration is null"); + } + + return _connectionProvider == null ? new SpannerInboxAsync(_configuration) + : new SpannerInboxAsync(_configuration, _connectionProvider); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/SpannerOutboxBuilder.cs b/src/Fluent.Brighter.GoogleCloud/SpannerOutboxBuilder.cs new file mode 100644 index 0000000..f64ed3c --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/SpannerOutboxBuilder.cs @@ -0,0 +1,53 @@ +using Paramore.Brighter; +using Paramore.Brighter.Outbox.Spanner; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for creating instances of . +/// Provides a fluent API for configuring Google Cloud Spanner outbox storage. +/// +public sealed class SpannerOutboxBuilder +{ + private IAmARelationalDatabaseConfiguration? _configuration; + + /// + /// Sets the relational database configuration for the Spanner outbox. + /// + /// The database configuration containing connection settings and table names. + /// The builder instance for method chaining. + public SpannerOutboxBuilder SetConfiguration(IAmARelationalDatabaseConfiguration configuration) + { + _configuration = configuration; + return this; + } + + private IAmARelationalDbConnectionProvider? _connectionProvider; + + /// + /// Sets the relational database connection provider for the Spanner outbox. + /// + /// The connection provider to use for managing database connections. + /// The builder instance for method chaining. + public SpannerOutboxBuilder SetConnectionProvider(IAmARelationalDbConnectionProvider connectionProvider) + { + _connectionProvider = connectionProvider; + return this; + } + + /// + /// Builds a new instance of using the configured settings. + /// + /// A configured instance. + /// Thrown when configuration is not set. + internal SpannerOutbox Build() + { + if (_configuration == null) + { + throw new ConfigurationException("Configuration is null"); + } + + return _connectionProvider == null ? new SpannerOutbox(_configuration) + : new SpannerOutbox(_configuration, _connectionProvider); + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.GoogleCloud/TopicAttributeBuilder.cs b/src/Fluent.Brighter.GoogleCloud/TopicAttributeBuilder.cs new file mode 100644 index 0000000..1ec1b6d --- /dev/null +++ b/src/Fluent.Brighter.GoogleCloud/TopicAttributeBuilder.cs @@ -0,0 +1,178 @@ +using System; +using System.Collections.Generic; + +using Google.Cloud.PubSub.V1; +using Google.Protobuf.WellKnownTypes; + +using Paramore.Brighter.MessagingGateway.GcpPubSub; + +namespace Fluent.Brighter.GoogleCloud; + +/// +/// Builder class for fluently configuring Google Cloud Pub/Sub topic attributes. +/// Provides methods to set topic properties including retention, labels, schema settings, +/// and encryption configuration. +/// +public sealed class TopicAttributeBuilder +{ + private string _name = string.Empty; + + /// + /// Sets the name of the Topic. This is the resource identifier within the Project. + /// If not set, it defaults to the Brighter Publication Topic name. + /// + /// The topic name + /// The builder instance for method chaining + public TopicAttributeBuilder SetName(string name) + { + _name = name; + return this; + } + + private string? _projectId; + + /// + /// Sets the Google Cloud Project ID where the Topic should be located. + /// If null, the GcpMessagingGatewayConnection ProjectId will be used. + /// + /// The Google Cloud Project ID + /// The builder instance for method chaining + public TopicAttributeBuilder SetProjectId(string? projectId) + { + _projectId = projectId; + return this; + } + + private Dictionary _labels = new(); + + /// + /// Sets a dictionary of key-value pairs that are attached to the Topic as labels. + /// Labels are typically used for organization, billing, and resource management. + /// + /// Dictionary of label key-value pairs + /// The builder instance for method chaining + public TopicAttributeBuilder SetLabels(Dictionary labels) + { + _labels = labels; + return this; + } + + /// + /// Adds a single label to the Topic. + /// + /// The label key + /// The label value + /// The builder instance for method chaining + public TopicAttributeBuilder AddLabel(string key, string value) + { + _labels[key] = value; + return this; + } + + private TimeSpan? _messageRetentionDuration; + + /// + /// Sets the duration for which Pub/Sub retains messages published to the topic. + /// If null, messages are retained for 7 days (the default maximum). + /// Note: The value must be at least 10 minutes (600 seconds). + /// + /// The message retention duration + /// The builder instance for method chaining + public TopicAttributeBuilder SetMessageRetentionDuration(TimeSpan? duration) + { + _messageRetentionDuration = duration; + return this; + } + + private MessageStoragePolicy? _storePolicy; + + /// + /// Sets the message storage policy configuration for the Topic. + /// This defines which Google Cloud regions are allowed to store messages for this topic. + /// + /// The message storage policy + /// The builder instance for method chaining + public TopicAttributeBuilder SetStorePolicy(MessageStoragePolicy? storePolicy) + { + _storePolicy = storePolicy; + return this; + } + + private SchemaSettings? _schemaSettings; + + /// + /// Sets the schema settings for the Topic. + /// This is used to enforce a specific schema (like Avro or Protocol Buffers) on published messages. + /// + /// The schema settings + /// The builder instance for method chaining + public TopicAttributeBuilder SetSchemaSettings(SchemaSettings? schemaSettings) + { + _schemaSettings = schemaSettings; + return this; + } + + private string? _kmsKeyName; + + /// + /// Sets the Cloud KMS key name that is used to encrypt and decrypt messages published to the topic. + /// This enables Customer-Managed Encryption Keys (CMEK). + /// + /// The KMS key name + /// The builder instance for method chaining + public TopicAttributeBuilder SetKmsKeyName(string? kmsKeyName) + { + _kmsKeyName = kmsKeyName; + return this; + } + + private Action? _topicConfiguration; + + /// + /// Sets an action to configure the Topic object before it is used for creation or update. + /// This allows for setting any property on the underlying Google Pub/Sub Topic object not exposed + /// directly by TopicAttributes. + /// + /// Action to configure the Topic + /// The builder instance for method chaining + public TopicAttributeBuilder SetTopicConfiguration(Action? topicConfiguration) + { + _topicConfiguration = topicConfiguration; + return this; + } + + private Action? _updateMaskConfiguration; + + /// + /// Sets an action to configure the FieldMask used when updating a Pub/Sub Topic. + /// This is required to explicitly tell the Pub/Sub API which fields are being changed. + /// Use this to include fields configured via TopicConfiguration that aren't standard Brighter attributes. + /// + /// Action to configure the FieldMask + /// The builder instance for method chaining + public TopicAttributeBuilder SetUpdateMaskConfiguration(Action? updateMaskConfiguration) + { + _updateMaskConfiguration = updateMaskConfiguration; + return this; + } + + /// + /// Builds a TopicAttributes instance with the configured values. + /// + /// A configured TopicAttributes instance + internal TopicAttributes Build() + { + return new TopicAttributes + { + Name = _name, + ProjectId = _projectId, + Labels = _labels, + MessageRetentionDuration = _messageRetentionDuration, + StorePolicy = _storePolicy, + SchemaSettings = _schemaSettings, + KmsKeyName = _kmsKeyName, + TopicConfiguration = _topicConfiguration, + UpdateMaskConfiguration = _updateMaskConfiguration + }; + } +} \ No newline at end of file diff --git a/src/Fluent.Brighter.MongoDb/MongoDbConfigurator.cs b/src/Fluent.Brighter.MongoDb/MongoDbConfigurator.cs index b44124c..b90a621 100644 --- a/src/Fluent.Brighter.MongoDb/MongoDbConfigurator.cs +++ b/src/Fluent.Brighter.MongoDb/MongoDbConfigurator.cs @@ -59,8 +59,7 @@ public MongoDbConfigurator SetConnection(IAmAMongoDbConfiguration configuration) /// Thrown if called before . public MongoDbConfigurator UseInbox() { - UseInbox("inbox"); - return this; + return UseInbox(string.Empty); } /// @@ -72,16 +71,13 @@ public MongoDbConfigurator UseInbox() /// Thrown if called before . public MongoDbConfigurator UseInbox(string collectionName) { - if (string.IsNullOrEmpty(collectionName)) + return UseInbox(c => { - throw new ArgumentException("Collection name cannot be null or empty.", nameof(collectionName)); - } - - _action += fluent => fluent.Subscriptions(x => x - .UseMongoDbInbox(cfg => cfg - .SetCollection(collectionName) - .SetConfiguration(_configuration!))); - return this; + if (!string.IsNullOrEmpty(collectionName)) + { + c.SetName(collectionName); + } + }); } /// @@ -114,8 +110,7 @@ public MongoDbConfigurator UseInbox(ActionThrown if called before . public MongoDbConfigurator UseOutbox() { - UseOutbox("outbox"); - return this; + return UseOutbox(string.Empty); } /// @@ -124,17 +119,15 @@ public MongoDbConfigurator UseOutbox() /// The name of the MongoDB collection to use for the outbox. /// The current instance for method chaining. /// Thrown if is null or empty. - /// Thrown if called before . public MongoDbConfigurator UseOutbox(string collectionName) { - if (string.IsNullOrEmpty(collectionName)) - throw new ArgumentException("Collection name cannot be null or empty.", nameof(collectionName)); - - _action += fluent => fluent.Producers(x => x - .UseMongoDbOutbox(cfg => cfg - .SetCollection(collectionName) - .SetConfiguration(_configuration!))); - return this; + return UseOutbox(c => + { + if (!string.IsNullOrEmpty(collectionName)) + { + c.SetName(collectionName); + } + }); } /// @@ -172,8 +165,7 @@ public MongoDbConfigurator UseOutbox(ActionThrown if called before . public MongoDbConfigurator UseDistributedLock() { - UseDistributedLock("distributedLock"); - return this; + return UseDistributedLock(string.Empty); } /// @@ -185,14 +177,13 @@ public MongoDbConfigurator UseDistributedLock() /// Thrown if called before . public MongoDbConfigurator UseDistributedLock(string collectionName) { - if (string.IsNullOrEmpty(collectionName)) - throw new ArgumentException("Collection name cannot be null or empty.", nameof(collectionName)); - - _action += fluent => fluent.Producers(x => x - .UseMongoDbDistributedLock(cfg => cfg - .SetCollection(collectionName) - .SetConfiguration(_configuration!))); - return this; + return UseDistributedLock(c => + { + if (!string.IsNullOrEmpty(collectionName)) + { + c.SetName(collectionName); + } + }); } /// @@ -269,10 +260,12 @@ public MongoDbConfigurator UseLuggageStore(Action co MaxConnectionPoolSize = settings.MaxConnectionPoolSize, MinConnectionPoolSize = settings.MinConnectionPoolSize, Username = settings.Credential.Username, +#pragma warning disable CS0618 // Type or member is obsolete Password = settings.Credential.Password, ReadConcernLevel = settings.ReadConcern.Level, ReadPreference = settings.ReadPreference, ReplicaSetName = settings.ReplicaSetName, +#pragma warning restore CS0618 // Type or member is obsolete RetryReads = settings.RetryReads, RetryWrites = settings.RetryWrites, Scheme = settings.Scheme,