Skip to content

Commit

Permalink
Improve test resilience (#745)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkemkes authored Feb 2, 2023
1 parent 4cb644a commit 187f1f3
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,31 @@ public async Task Consume_PublishIntoExtensionDefinedTopic_ConsumedEqualsPublish
public async Task Consume_LimitMaxConcurrentMessages_StartProcessingLimitedNumberOfMessagesSimultaneously()
{
const int maxConcurrentMessages = 5;
var taskCompletionSource = new TaskCompletionSource();
var topic = _randomizerString.Generate();
const string message = "testMessage";
for (var i = 0; i < maxConcurrentMessages * 2; i++)
{
await PublishMessage(topic, "someKey", message);
}
var consumer = GetConsumer<string>(topic, maxConcurrentMessages);
var numberOfParallelMessages = 0;
var numberOfStartedMessages = 0;
consumer.ConsumeCallbackAsync = async (_, cancellationToken) =>
{
numberOfParallelMessages++;
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
numberOfParallelMessages--;
numberOfStartedMessages++;
taskCompletionSource.TrySetResult();
await Task.Delay(-1, cancellationToken); // Wait indefinitely
return await Task.FromResult(ProcessedMessageStatus.Success);
};

await consumer.StartAsync();
consumer.ExecuteAsync();
await Task.Delay(TimeSpan.FromSeconds(2));

Assert.Equal(maxConcurrentMessages, numberOfParallelMessages);
// Wait until processing begins
await taskCompletionSource.Task;
// Give consumer enough time to process further messages
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal(maxConcurrentMessages, numberOfStartedMessages);
}

private async Task PublishMessage(string topic, string key, string value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public async Task<MotorCloudEvent<byte[]>> GetMessageFromQueue(string queueName)
{
var message = (byte[])null;
var priority = (byte)0;
var taskCompletionSource = new TaskCompletionSource();

using (var channel = Fixture.Connection.CreateModel())
{
Expand All @@ -340,10 +341,11 @@ public async Task<MotorCloudEvent<byte[]>> GetMessageFromQueue(string queueName)
{
priority = args.BasicProperties.Priority;
message = args.Body.ToArray();
taskCompletionSource.TrySetResult();
};

channel.BasicConsume(queueName, false, consumer);
await Task.Delay(TimeSpan.FromSeconds(2));
await Task.WhenAny(taskCompletionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
}

var cloudEvent = MotorCloudEvent.CreateTestCloudEvent(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,32 +70,41 @@ public async Task ConsumerStartAsync_ConsumerWithDlxRejectMessage_MessageIsInDlx
Assert.Equal(message, results.TypedData);
}

[Theory]
[Theory(Timeout = 50000)]
[InlineData(false, 0)]
[InlineData(true, 1)]
public async Task ConsumerStartAsync_CallbackInvalidInput_VerifyCorrectBehaviorOnInvalidInput(bool republishOnInvalidInput, uint expectedNumberOfMessagesInDlxQueue)
{
var taskCompletionSource = new TaskCompletionSource();
var builder = RabbitMQTestBuilder
.CreateWithQueueDeclare(_fixture)
.WithDeadLetterExchange()
.WithRepublishToDeadLetterExchangeOnInvalidInput(republishOnInvalidInput)
.WithSingleRandomPublishedMessage()
.WithConsumerCallback((_, _) => Task.FromResult(ProcessedMessageStatus.InvalidInput))
.WithConsumerCallback((_, _) =>
{
taskCompletionSource.TrySetResult();
return Task.FromResult(ProcessedMessageStatus.InvalidInput);
})
.Build();
var consumer = builder.GetConsumer<string>();

await consumer.StartAsync();

await Task.Delay(TimeSpan.FromSeconds(2));
// Wait until processing begins
await taskCompletionSource.Task;
// Give RabbitMQConsumer enough time to acknowledge message
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal((uint)0, builder.MessagesInQueue(builder.QueueName));
Assert.Equal(expectedNumberOfMessagesInDlxQueue, builder.MessagesInQueue(builder.DlxQueueName));
}

[Fact]
[Fact(Timeout = 50000)]
public async Task ConsumerStartAsync_ConsumeMessage_ConsumedEqualsPublished()
{
const byte priority = 111;
var message = new byte[] { 1, 2, 3 };
var taskCompletionSource = new TaskCompletionSource();
byte? consumedPriority = null;
var consumedMessage = (byte[])null;
var builder = RabbitMQTestBuilder
Expand All @@ -105,15 +114,18 @@ public async Task ConsumerStartAsync_ConsumeMessage_ConsumedEqualsPublished()
{
consumedPriority = motorEvent.GetRabbitMQPriority();
consumedMessage = motorEvent.TypedData;
taskCompletionSource.TrySetResult();
return Task.FromResult(ProcessedMessageStatus.Success);
})
.Build();
var consumer = builder.GetConsumer<string>();

await consumer.StartAsync();

await Task.Delay(TimeSpan.FromSeconds(5));
Assert.NotNull(consumedMessage);
// Wait until processing begins
await taskCompletionSource.Task;
// Give RabbitMQConsumer enough time to acknowledge message
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal(priority, consumedPriority);
Assert.Equal(message, consumedMessage);
}
Expand Down Expand Up @@ -141,119 +153,165 @@ public async Task PublisherPublishMessageAsync_SomeMessage_PublishedEqualsConsum
Assert.Equal(message, results.Data);
}

[Fact]
[Fact(Timeout = 50000)]
public async Task ConsumerStartAsync_OneMessageInQueueAndConsumeCallbackSuccess_QueueEmptyAfterAck()
{

var taskCompletionSource = new TaskCompletionSource();
var builder = RabbitMQTestBuilder
.CreateWithoutQueueDeclare(_fixture)
.WithSingleRandomPublishedMessage()
.WithConsumerCallback((_, _) => Task.FromResult(ProcessedMessageStatus.Success))
.WithConsumerCallback((_, _) =>
{
taskCompletionSource.TrySetResult();
return Task.FromResult(ProcessedMessageStatus.Success);
})
.Build();
var consumer = builder.GetConsumer<string>();

await consumer.StartAsync();

await Task.Delay(TimeSpan.FromSeconds(2));
// Wait until processing begins
await taskCompletionSource.Task;
// Give RabbitMQConsumer enough time to acknowledge message
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal((uint)0, builder.MessagesInQueue(builder.QueueName));
}

[Fact]
public async Task ConsumerStartAsync_CheckParallelProcessing_EnsureAllMessagesAreConsumed()
{
const int raceConditionTimeout = 2;
const int messageProcessingTimeSeconds = 2;

const ushort messageCount = 99;
var synchronousExecutionTime = TimeSpan.FromSeconds(messageProcessingTimeSeconds * messageCount);
var builder = RabbitMQTestBuilder
.CreateWithQueueDeclare(_fixture)
.WithConsumerCallback(async (_, _) =>
.WithConsumerCallback(async (_, ct) =>
{
await Task.CompletedTask;
await Task.Delay(TimeSpan.FromSeconds(messageProcessingTimeSeconds));
await Task.Delay(TimeSpan.FromSeconds(messageProcessingTimeSeconds), ct);
return ProcessedMessageStatus.Success;
})
.WithMultipleRandomPublishedMessage()
.WithMultipleRandomPublishedMessage(messageCount)
.Build();
var consumer = builder.GetConsumer<string>();

await consumer.StartAsync();
var timeout = Task.Delay(synchronousExecutionTime);

await Task.Delay(TimeSpan.FromSeconds(messageProcessingTimeSeconds + raceConditionTimeout));
while (builder.MessagesInQueue(builder.QueueName) != 0 && !timeout.IsCompleted)
{
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
Assert.Equal((uint)0, builder.MessagesInQueue(builder.QueueName));
}

[Fact]
[Fact(Timeout = 50000)]
public async Task
ConsumerStartAsync_OneMessageInQueueAndConsumeCallbackReturnsTempFailureAndAfterwardsSuccess_MessageConsumedTwice()
{
var taskCompletionSource = new TaskCompletionSource();
var consumerCounter = 0;
var builder = RabbitMQTestBuilder
.CreateWithQueueDeclare(_fixture)
.WithSingleRandomPublishedMessage()

.WithConsumerCallback((_, _) =>
{
consumerCounter++;
return Task.FromResult(consumerCounter == 1
? ProcessedMessageStatus.TemporaryFailure
: ProcessedMessageStatus.Success);
if (consumerCounter == 1)
{
return Task.FromResult(ProcessedMessageStatus.TemporaryFailure);
}
taskCompletionSource.TrySetResult();
return Task.FromResult(ProcessedMessageStatus.Success);
})
.Build();
var consumer = builder.GetConsumer<string>();

await consumer.StartAsync();

await Task.Delay(TimeSpan.FromSeconds(2));
// Wait until second processing begins
await taskCompletionSource.Task;
// Give RabbitMQConsumer enough time to acknowledge message
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal(2, consumerCounter);
}

[Fact]
public async Task ConsumerStartAsync_OneMessageInQueueAndConsumeCallbackInvalidInput_QueueEmptyAfterReject()
{
var taskCompletionSource = new TaskCompletionSource();
var builder = RabbitMQTestBuilder
.CreateWithQueueDeclare(_fixture)
.WithSingleRandomPublishedMessage()
.WithConsumerCallback((_, _) => Task.FromResult(ProcessedMessageStatus.InvalidInput))
.WithConsumerCallback((_, _) =>
{
taskCompletionSource.TrySetResult();
return Task.FromResult(ProcessedMessageStatus.InvalidInput);
})
.Build();
var consumer = builder.GetConsumer<string>();

await consumer.StartAsync();

await Task.Delay(TimeSpan.FromSeconds(2));
// Wait until processing begins
await taskCompletionSource.Task;
// Give RabbitMQConsumer enough time to acknowledge message
await Task.Delay(TimeSpan.FromSeconds(1));
Assert.Equal((uint)0, builder.MessagesInQueue(builder.QueueName));
}


[Fact]
public async Task ConsumerStartAsync_ConsumeCallbackAsyncThrows_CriticalExitCalled()
{
var taskCompletionSource = new TaskCompletionSource();
var builder = RabbitMQTestBuilder
.CreateWithQueueDeclare(_fixture)
.WithSingleRandomPublishedMessage()
.WithConsumerCallback((_, _) => throw new Exception())
.WithConsumerCallback((_, _) =>
{
taskCompletionSource.TrySetResult();
throw new Exception();
})
.Build();

var applicationLifetimeMock = new Mock<IHostApplicationLifetime>();
var consumer = builder.GetConsumer<string>(applicationLifetimeMock.Object);

await consumer.StartAsync();

await Task.Delay(TimeSpan.FromSeconds(3));
// Wait until processing begins
await taskCompletionSource.Task;
// Give RabbitMQConsumer enough time to acknowledge message
await Task.Delay(TimeSpan.FromSeconds(1));
applicationLifetimeMock.VerifyUntilTimeoutAsync(t => t.StopApplication(), Times.Once);
}

[Fact]
public async Task ConsumerStartAsync_ConsumeCallbackReturnsACriticalStatus_CriticalExitCalled()
{
var taskCompletionSource = new TaskCompletionSource();
var builder = RabbitMQTestBuilder
.CreateWithQueueDeclare(_fixture)
.WithSingleRandomPublishedMessage()
.WithConsumerCallback((_, _) => Task.FromResult(ProcessedMessageStatus.CriticalFailure))
.WithConsumerCallback((_, _) =>
{
taskCompletionSource.TrySetResult();
return Task.FromResult(ProcessedMessageStatus.CriticalFailure);
})
.Build();

var applicationLifetimeMock = new Mock<IHostApplicationLifetime>();
var consumer = builder.GetConsumer<string>(applicationLifetimeMock.Object);

await consumer.StartAsync();

// Wait until processing begins
await taskCompletionSource.Task;
// Give RabbitMQConsumer enough time to acknowledge message
await Task.Delay(TimeSpan.FromSeconds(1));
applicationLifetimeMock.VerifyUntilTimeoutAsync(t => t.StopApplication(), Times.Once);
}

Expand Down Expand Up @@ -318,7 +376,7 @@ public async Task QueueMonitor_GetCurrentState_SingleMessage_ReadyMessagesOne()
{
Queue =
{
Name = builder.QueueName
Name = builder.QueueName
}
}),
_fixture.ConnectionFactory<string>()
Expand All @@ -344,7 +402,7 @@ public async Task QueueMonitor_GetCurrentState_MultipleMessages_ReadyMessagesGre
{
Queue =
{
Name = builder.QueueName
Name = builder.QueueName
}
}),
_fixture.ConnectionFactory<string>()
Expand Down Expand Up @@ -380,7 +438,7 @@ public async Task QueueMonitor_GetCurrentState_ActiveConsumer_ConsumerCountGreat
{
Queue =
{
Name = builder.QueueName
Name = builder.QueueName
}
}),
_fixture.ConnectionFactory<string>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,18 @@ private static IHost GetReverseStringService()

private static async Task<string> GetMessageFromDestinationQueue(IModel channel)
{
var taskCompletionSource = new TaskCompletionSource();
var destinationQueueName = Environment.GetEnvironmentVariable("DestinationQueueName");
var consumer = new EventingBasicConsumer(channel);
var messageFromDestinationQueue = string.Empty;
consumer.Received += (_, args) =>
{
var bytes = args.Body;
messageFromDestinationQueue = Encoding.UTF8.GetString(bytes.ToArray());
taskCompletionSource.TrySetResult();
};
channel.BasicConsume(destinationQueueName, false, consumer);
while (messageFromDestinationQueue == string.Empty)
{
await Task.Delay(TimeSpan.FromMilliseconds(50)).ConfigureAwait(false);
}
await Task.WhenAny(taskCompletionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));

return messageFromDestinationQueue;
}
Expand Down

0 comments on commit 187f1f3

Please sign in to comment.