From b003eea365c5511ec73c27b46d66575743838ff6 Mon Sep 17 00:00:00 2001 From: TomMalow Date: Thu, 25 May 2023 10:27:32 +0200 Subject: [PATCH 1/2] feat(ServiceBus): add new PublishAsync to send mutiple messages in batches The new method will publish multiple messages in a batch. If the messages exceed the batch size, the current batch will be sent and remaining will be send in a new batch. Multiple batches can be sent The method will throw an exception if a single message exceeds the total allowed size of a single batch. Does not rollback any previous sent messages. --- .../ServiceBus/IServiceBusPublisher.cs | 20 +- .../ServiceBus/ServiceBusPublisher.cs | 56 +++- .../ServiceBus/ServiceBusPublisherTests.cs | 255 ++++++++++++++++++ 3 files changed, 329 insertions(+), 2 deletions(-) diff --git a/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs b/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs index 1dc16c2..c8f340f 100644 --- a/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs +++ b/src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs @@ -23,6 +23,24 @@ Task PublishAsync( TimeSpan? timeToLive = null, CancellationToken cancellationToken = default); + /// + /// Publishes multiple messages in batches. The list of messages will be split in multiple batches if the messages exceeds a single batch size. + /// + /// The topic or queue name. + /// The messages to be published. + /// Optional id for appending the message to a known session. If not set, then defaults to a new session. + /// Optional custom metadata about the message. + /// Optional for message to be consumed. If not set, then defaults to the value specified on queue or topic. + /// The used. + /// A representing the asynchronous operation. + Task PublishAsync( + string topicOrQueue, + IReadOnlyCollection messages, + string? sessionId = null, + IDictionary? properties = null, + TimeSpan? timeToLive = null, + CancellationToken cancellationToken = default); + /// /// Schedules a message for publishing at a later time. /// @@ -44,7 +62,7 @@ Task SchedulePublishAsync( CancellationToken cancellationToken = default); /// - /// Cansels a scheduled publish of a message if it has not been published yet. + /// Cancels a scheduled publish of a message if it has not been published yet. /// /// The topic or queue name. /// The sequence number of the scheduled message to cancel. diff --git a/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs b/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs index d2002e5..314d244 100644 --- a/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs +++ b/src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs @@ -28,6 +28,60 @@ public Task PublishAsync( cancellationToken); } + public async Task PublishAsync( + string topicOrQueue, + IReadOnlyCollection messages, + string? sessionId = null, + IDictionary? properties = null, + TimeSpan? timeToLive = null, + CancellationToken cancellationToken = default) + { + var sender = clientProvider.GetSender(topicOrQueue); + + var batch = await sender + .CreateMessageBatchAsync(cancellationToken) + .ConfigureAwait(false); + + foreach (var message in messages) + { + if (cancellationToken.IsCancellationRequested) + { + break; + } + + var busMessage = CreateServiceBusMessage( + sessionId, + JsonSerializer.Serialize(message), + properties, + timeToLive); + + if (batch.TryAddMessage(busMessage)) + { + continue; + } + + await sender + .SendMessagesAsync(batch, cancellationToken) + .ConfigureAwait(false); + + batch.Dispose(); + batch = await sender + .CreateMessageBatchAsync(cancellationToken) + .ConfigureAwait(false); + + if (!batch.TryAddMessage(busMessage)) + { + throw new InvalidOperationException("Unable to add message to batch. The message size exceeds what can be send in a batch"); + } + } + + await sender + .SendMessagesAsync(batch, cancellationToken) + .ConfigureAwait(false); + + batch.Dispose(); + } + public Task SchedulePublishAsync( string topicOrQueue, object message, @@ -89,5 +143,5 @@ private static ServiceBusMessage CreateServiceBusMessage( } return message; - } + } } \ No newline at end of file diff --git a/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs b/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs index 83d012d..8d1c1cd 100644 --- a/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs +++ b/test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs @@ -208,4 +208,259 @@ await sut.PublishAsync( .Should() .Be(TimeSpan.MaxValue); } + + [Theory, AutoNSubstituteData] + internal async Task Should_Get_ServiceBusSender_For_Topic_On_Batch( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + string topicName, + object messageBody, + IDictionary properties, + TimeSpan timeToLive, + string sessionId, + CancellationToken cancellationToken) + { + var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List()); + + provider + .GetSender(default!) + .ReturnsForAnyArgs(sender); + + sender + .CreateMessageBatchAsync(default!) + .ReturnsForAnyArgs(messageBatch); + + await sut.PublishAsync( + topicName, + new object[] { messageBody }, + sessionId, + properties, + timeToLive, + cancellationToken); + + _ = provider + .Received(1) + .GetSender(topicName); + } + + [Theory, AutoNSubstituteData] + internal async Task Should_Create_MessageBatch( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + string topicName, + object messageBody, + IDictionary properties, + TimeSpan timeToLive, + string sessionId, + CancellationToken cancellationToken) + { + var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List()); + + provider + .GetSender(default!) + .ReturnsForAnyArgs(sender); + + sender + .CreateMessageBatchAsync(default!) + .ReturnsForAnyArgs(messageBatch); + + await sut.PublishAsync( + topicName, + new object[] { messageBody }, + sessionId, + properties, + timeToLive, + cancellationToken); + + _ = await sender + .Received(1) + .CreateMessageBatchAsync(cancellationToken); + } + + [Theory, AutoNSubstituteData] + internal async Task Should_Send_Message_On_ServiceBusSender_On_Message_Batch( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + string topicName, + object messageBody, + IDictionary properties, + TimeSpan timeToLive, + string sessionId, + CancellationToken cancellationToken) + { + var batchList = new List(); + var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, batchList); + + provider + .GetSender(default!) + .ReturnsForAnyArgs(sender); + + sender + .CreateMessageBatchAsync(default!) + .ReturnsForAnyArgs(messageBatch); + + await sut.PublishAsync( + topicName, + new object[] { messageBody }, + sessionId, + properties, + timeToLive, + cancellationToken); + + var sendMessageBatch = sender + .ReceivedCallWithArgument(); + + sendMessageBatch.Count.Should().Be(1); + batchList.Count.Should().Be(1); + + batchList[0].MessageId + .Should() + .NotBeNullOrEmpty(); + batchList[0].Body + .ToString() + .Should() + .BeEquivalentTo(JsonSerializer.Serialize(messageBody)); + batchList[0].ApplicationProperties + .Should() + .BeEquivalentTo(properties); + batchList[0].TimeToLive + .Should() + .Be(timeToLive); + } + + [Theory, AutoNSubstituteData] + internal async Task Should_Create_New_MessageBatch_When_First_Batch_Is_Full( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + string topicName, + object messageBody, + IDictionary properties, + TimeSpan timeToLive, + string sessionId, + CancellationToken cancellationToken) + { + var firstBatchList = new List(); + var secondBatchList = new List(); + var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false); + var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList); + + provider + .GetSender(default!) + .ReturnsForAnyArgs(sender); + + sender + .CreateMessageBatchAsync(default!) + .ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch); + + await sut.PublishAsync( + topicName, + new object[] { messageBody }, + sessionId, + properties, + timeToLive, + cancellationToken); + + _ = await sender + .Received(2) + .CreateMessageBatchAsync(cancellationToken); + } + + [Theory, AutoNSubstituteData] + internal async Task Should_Send_Multiple_Batches_If_When_Messages_Exceeds_Single_Batch( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + string topicName, + object messageBody, + IDictionary properties, + TimeSpan timeToLive, + string sessionId, + CancellationToken cancellationToken) + { + var firstBatchList = new List(); + var secondBatchList = new List(); + var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false); + var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList); + + provider + .GetSender(default!) + .ReturnsForAnyArgs(sender); + + sender + .CreateMessageBatchAsync(default!) + .ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch); + + await sut.PublishAsync( + topicName, + new object[] { messageBody }, + sessionId, + properties, + timeToLive, + cancellationToken); + + _ = sender + .Received(1) + .SendMessagesAsync(firstMessageBatch, cancellationToken); + + _ = sender + .Received(1) + .SendMessagesAsync(secondMessageBatch, cancellationToken); + + firstBatchList.Should().BeEmpty(); + secondBatchList.Count.Should().Be(1); + + secondBatchList[0].MessageId + .Should() + .NotBeNullOrEmpty(); + secondBatchList[0].Body + .ToString() + .Should() + .BeEquivalentTo(JsonSerializer.Serialize(messageBody)); + secondBatchList[0].ApplicationProperties + .Should() + .BeEquivalentTo(properties); + secondBatchList[0].TimeToLive + .Should() + .Be(timeToLive); + } + + [Theory, AutoNSubstituteData] + internal Task Should_Throw_If_Message_Is_Too_Large_To_Fit_In_New_Batch( + [Frozen] IServiceBusSenderProvider provider, + ServiceBusPublisher sut, + [Substitute] ServiceBusSender sender, + string topicName, + object messageBody, + IDictionary properties, + TimeSpan timeToLive, + string sessionId, + CancellationToken cancellationToken) + { + var firstBatchList = new List(); + var secondBatchList = new List(); + var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false); + var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList, tryAddCallback: _ => false); + + provider + .GetSender(default!) + .ReturnsForAnyArgs(sender); + + sender + .CreateMessageBatchAsync(default!) + .ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch); + + var act = async () => await sut.PublishAsync( + topicName, + new object[] { messageBody }, + sessionId, + properties, + timeToLive, + cancellationToken); + + return act.Should().ThrowAsync(); + } } \ No newline at end of file From 7fc9130d74c746961168c53458aa2f8a782f31be Mon Sep 17 00:00:00 2001 From: TomMalow Date: Thu, 25 May 2023 12:00:23 +0200 Subject: [PATCH 2/2] chor: Update readme with Service Bus Batch publish description and example --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index b73f98b..cc058e1 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,27 @@ internal class BarPublisher } ``` +### Batch Publish + +Multiple messages can also be published in batches to a topic or queue. Simply call the `PublishAsync` method with a list of messages. The messages will be added to a batch until the batch is full before it the batch is published and continue to work on the remaining messages. This process continues until all messages are consumed. + +An `InvalidOperationException` is thrown if a single message cannot fit inside a batch by itself. In this case, any previous published batches will not be rolled back and any remaining messages will remain unpublished. + +```csharp +internal class BarBatchPublisher +{ + private readonly IServiceBusPublisher publisher; + + public BarBatchPublisher(IServiceBusPublisher publisher) + { + this.publisher = publisher; + } + + public Task Publish(object[] messages) + => publisher.PublishAsync("[existing servicebus topic]", messages); +} +``` + Here's a full example of how to use the publishers above using a Minimal API setup (SwaggerUI enabled) with a single endpoint called `POST /data` that accepts a simple request body `{ "a": "string", "b": "string", "c": "string" }` which publishes the request to an EventHub and a ServiceBus topic ```csharp