Skip to content

Commit

Permalink
Merge pull request #5 from atc-net/feature/service-bus-publish-batch
Browse files Browse the repository at this point in the history
Add new PublishAsync to send multiple messages in batches
  • Loading branch information
TomMalow authored May 25, 2023
2 parents a23a511 + 7fc9130 commit fc4e224
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 2 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ internal class BarPublisher
}
```

### Batch Publish

Multiple messages can also be published in batches to a topic or queue. Simply call the `PublishAsync` method with a list of messages. The messages will be added to a batch until the batch is full before it the batch is published and continue to work on the remaining messages. This process continues until all messages are consumed.

An `InvalidOperationException` is thrown if a single message cannot fit inside a batch by itself. In this case, any previous published batches will not be rolled back and any remaining messages will remain unpublished.

```csharp
internal class BarBatchPublisher
{
private readonly IServiceBusPublisher publisher;

public BarBatchPublisher(IServiceBusPublisher publisher)
{
this.publisher = publisher;
}

public Task Publish(object[] messages)
=> publisher.PublishAsync("[existing servicebus topic]", messages);
}
```

Here's a full example of how to use the publishers above using a Minimal API setup (SwaggerUI enabled) with a single endpoint called `POST /data` that accepts a simple request body `{ "a": "string", "b": "string", "c": "string" }` which publishes the request to an EventHub and a ServiceBus topic

```csharp
Expand Down
20 changes: 19 additions & 1 deletion src/Atc.Azure.Messaging/ServiceBus/IServiceBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ Task PublishAsync(
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Publishes multiple messages in batches. The list of messages will be split in multiple batches if the messages exceeds a single batch size.
/// </summary>
/// <param name="topicOrQueue">The topic or queue name.</param>
/// <param name="messages">The messages to be published.</param>
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
/// <param name="properties">Optional custom metadata about the message.</param>
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task PublishAsync(
string topicOrQueue,
IReadOnlyCollection<object> messages,
string? sessionId = null,
IDictionary<string, string>? properties = null,
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Schedules a message for publishing at a later time.
/// </summary>
Expand All @@ -44,7 +62,7 @@ Task<long> SchedulePublishAsync(
CancellationToken cancellationToken = default);

/// <summary>
/// Cansels a scheduled publish of a message if it has not been published yet.
/// Cancels a scheduled publish of a message if it has not been published yet.
/// </summary>
/// <param name="topicOrQueue">The topic or queue name.</param>
/// <param name="sequenceNumber">The sequence number of the scheduled message to cancel.</param>
Expand Down
56 changes: 55 additions & 1 deletion src/Atc.Azure.Messaging/ServiceBus/ServiceBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,60 @@ public Task PublishAsync(
cancellationToken);
}

public async Task PublishAsync(
string topicOrQueue,
IReadOnlyCollection<object> messages,
string? sessionId = null,
IDictionary<string, string>? properties = null,
TimeSpan? timeToLive = null,
CancellationToken cancellationToken = default)
{
var sender = clientProvider.GetSender(topicOrQueue);

var batch = await sender
.CreateMessageBatchAsync(cancellationToken)
.ConfigureAwait(false);

foreach (var message in messages)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}

var busMessage = CreateServiceBusMessage(
sessionId,
JsonSerializer.Serialize(message),
properties,
timeToLive);

if (batch.TryAddMessage(busMessage))
{
continue;
}

await sender
.SendMessagesAsync(batch, cancellationToken)
.ConfigureAwait(false);

batch.Dispose();
batch = await sender
.CreateMessageBatchAsync(cancellationToken)
.ConfigureAwait(false);

if (!batch.TryAddMessage(busMessage))
{
throw new InvalidOperationException("Unable to add message to batch. The message size exceeds what can be send in a batch");
}
}

await sender
.SendMessagesAsync(batch, cancellationToken)
.ConfigureAwait(false);

batch.Dispose();
}

public Task<long> SchedulePublishAsync(
string topicOrQueue,
object message,
Expand Down Expand Up @@ -89,5 +143,5 @@ private static ServiceBusMessage CreateServiceBusMessage(
}

return message;
}
}
}
255 changes: 255 additions & 0 deletions test/Atc.Azure.Messaging.Tests/ServiceBus/ServiceBusPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,259 @@ await sut.PublishAsync(
.Should()
.Be(TimeSpan.MaxValue);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Get_ServiceBusSender_For_Topic_On_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List<ServiceBusMessage>());

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(messageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = provider
.Received(1)
.GetSender(topicName);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Create_MessageBatch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, new List<ServiceBusMessage>());

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(messageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = await sender
.Received(1)
.CreateMessageBatchAsync(cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Send_Message_On_ServiceBusSender_On_Message_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var batchList = new List<ServiceBusMessage>();
var messageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, batchList);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(messageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

var sendMessageBatch = sender
.ReceivedCallWithArgument<ServiceBusMessageBatch>();

sendMessageBatch.Count.Should().Be(1);
batchList.Count.Should().Be(1);

batchList[0].MessageId
.Should()
.NotBeNullOrEmpty();
batchList[0].Body
.ToString()
.Should()
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
batchList[0].ApplicationProperties
.Should()
.BeEquivalentTo(properties);
batchList[0].TimeToLive
.Should()
.Be(timeToLive);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Create_New_MessageBatch_When_First_Batch_Is_Full(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var firstBatchList = new List<ServiceBusMessage>();
var secondBatchList = new List<ServiceBusMessage>();
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = await sender
.Received(2)
.CreateMessageBatchAsync(cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Send_Multiple_Batches_If_When_Messages_Exceeds_Single_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var firstBatchList = new List<ServiceBusMessage>();
var secondBatchList = new List<ServiceBusMessage>();
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);

await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

_ = sender
.Received(1)
.SendMessagesAsync(firstMessageBatch, cancellationToken);

_ = sender
.Received(1)
.SendMessagesAsync(secondMessageBatch, cancellationToken);

firstBatchList.Should().BeEmpty();
secondBatchList.Count.Should().Be(1);

secondBatchList[0].MessageId
.Should()
.NotBeNullOrEmpty();
secondBatchList[0].Body
.ToString()
.Should()
.BeEquivalentTo(JsonSerializer.Serialize(messageBody));
secondBatchList[0].ApplicationProperties
.Should()
.BeEquivalentTo(properties);
secondBatchList[0].TimeToLive
.Should()
.Be(timeToLive);
}

[Theory, AutoNSubstituteData]
internal Task Should_Throw_If_Message_Is_Too_Large_To_Fit_In_New_Batch(
[Frozen] IServiceBusSenderProvider provider,
ServiceBusPublisher sut,
[Substitute] ServiceBusSender sender,
string topicName,
object messageBody,
IDictionary<string, string> properties,
TimeSpan timeToLive,
string sessionId,
CancellationToken cancellationToken)
{
var firstBatchList = new List<ServiceBusMessage>();
var secondBatchList = new List<ServiceBusMessage>();
var firstMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, firstBatchList, tryAddCallback: _ => false);
var secondMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(265000, secondBatchList, tryAddCallback: _ => false);

provider
.GetSender(default!)
.ReturnsForAnyArgs(sender);

sender
.CreateMessageBatchAsync(default!)
.ReturnsForAnyArgs(firstMessageBatch, secondMessageBatch);

var act = async () => await sut.PublishAsync(
topicName,
new object[] { messageBody },
sessionId,
properties,
timeToLive,
cancellationToken);

return act.Should().ThrowAsync<InvalidOperationException>();
}
}

0 comments on commit fc4e224

Please sign in to comment.