diff --git a/src/OpenMessage.AWS.SQS/SqsConsumer.cs b/src/OpenMessage.AWS.SQS/SqsConsumer.cs index 4185045..7bad77f 100644 --- a/src/OpenMessage.AWS.SQS/SqsConsumer.cs +++ b/src/OpenMessage.AWS.SQS/SqsConsumer.cs @@ -78,14 +78,22 @@ public async Task>> ConsumeAsync(CancellationToken cancellati if (_acknowledgementAction is null) Throw.Exception("Acknowledgement action cannot be null for SQS message"); - result.Add(new SqsMessage(_acknowledgementAction) + try + { + result.Add(new SqsMessage(_acknowledgementAction) + { + Id = message.MessageId, + Properties = properties, + ReceiptHandle = message.ReceiptHandle, + QueueUrl = _currentConsumerOptions.QueueUrl, + Value = _deserializationProvider.From(message.Body, contentType, messageType ?? string.Empty) + }); + } + catch (Exception e) { - Id = message.MessageId, - Properties = properties, - ReceiptHandle = message.ReceiptHandle, - QueueUrl = _currentConsumerOptions.QueueUrl, - Value = _deserializationProvider.From(message.Body, contentType, messageType ?? string.Empty) - }); + // Swallow deserialization exception to prevent blocking the pipeline and processing of subsequent messages. + _logger.LogError(e,$"Error deserializing message body. {e.Message}. {nameof(message.MessageId)}:{message.MessageId}. {nameof(message.MD5OfBody)}:{message.MD5OfBody}"); + } } return result; diff --git a/tests/OpenMessage.Tests/Pipelines/BatcherTests.cs b/tests/OpenMessage.Tests/Pipelines/BatcherTests.cs index f233992..de9bfbf 100644 --- a/tests/OpenMessage.Tests/Pipelines/BatcherTests.cs +++ b/tests/OpenMessage.Tests/Pipelines/BatcherTests.cs @@ -31,9 +31,9 @@ await Task.WhenAll(Enumerable.Range(0, _batchSize - 1) Assert.Equal(1, _history.Count); - Assert.Equal(_batchSize - 1, _history.Single() - .Count); - Assert.True(stopwatch.Elapsed >= _timeout); + Assert.Equal(_batchSize - 1, _history.Single().Count); + + Assert.True(stopwatch.Elapsed.Add(TimeSpan.FromMilliseconds(5)) >= _timeout); } [Fact] @@ -49,8 +49,8 @@ await Task.WhenAll(Enumerable.Range(0, _batchSize) Assert.Equal(1, _history.Count); - Assert.Equal(_batchSize, _history.Single() - .Count); + Assert.Equal(_batchSize, _history.Single().Count); + Assert.True(stopwatch.Elapsed < _timeout); }