Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: produce tombstone records #547

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using KafkaFlow.Middlewares.Serializer.Resolvers;
using Microsoft.IO;

Expand Down Expand Up @@ -38,18 +39,21 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
await _typeResolver.OnProduceAsync(context);

byte[] messageValue;
byte[] messageValue = Array.Empty<byte>();

using (var buffer = s_memoryStreamManager.GetStream())
if (context.Message.Value is not null)
{
await _serializer
.SerializeAsync(
context.Message.Value,
buffer,
new SerializerContext(context.ProducerContext.Topic))
.ConfigureAwait(false);

messageValue = buffer.ToArray();
using (var buffer = s_memoryStreamManager.GetStream())
{
await _serializer
.SerializeAsync(
context.Message.Value,
buffer,
new SerializerContext(context.ProducerContext.Topic))
.ConfigureAwait(false);

messageValue = buffer.ToArray();
}
}

await next(context.SetMessage(context.Message.Key, messageValue)).ConfigureAwait(false);
Expand Down
25 changes: 25 additions & 0 deletions tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ internal static class Bootstrapper
internal const string PauseResumeGroupId = "consumer-pause-resume";
internal const string AvroGroupId = "consumer-avro";
internal const string JsonGroupId = "consumer-json";
internal const string NullGroupId = "consumer-null";


private const string ProtobufTopicName = "test-protobuf";
private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr";
Expand All @@ -39,6 +41,7 @@ internal static class Bootstrapper
private const string ProtobufGzipTopicName = "test-protobuf-gzip";
private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2";
private const string AvroTopicName = "test-avro";
private const string NullTopicName = "test-null";

private static readonly Lazy<IServiceProvider> s_lazyProvider = new(SetupProvider);

Expand Down Expand Up @@ -198,6 +201,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.CreateTopicIfNotExists(JsonGzipTopicName, 2, 1)
.CreateTopicIfNotExists(ProtobufGzipTopicName, 2, 1)
.CreateTopicIfNotExists(ProtobufGzipTopicName2, 2, 1)
.CreateTopicIfNotExists(NullTopicName, 1, 1)
.AddConsumer(
consumer => consumer
.Topic(ProtobufTopicName)
Expand Down Expand Up @@ -249,6 +253,21 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf<MessageHandler>())))
.AddConsumer(
consumer => consumer
.Topic(NullTopicName)
.WithGroupId(NullGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddTypedHandlers(
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<NullMessageHandler>()
)))
.AddConsumer(
consumer => consumer
.Topics(GzipTopicName)
Expand Down Expand Up @@ -298,6 +317,12 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()))
.AddProducer<NullProducer>(
producer => producer
.DefaultTopic(NullTopicName)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()))
.AddProducer<JsonGzipProducer>(
producer => producer
.DefaultTopic(JsonGzipTopicName)
Expand Down
21 changes: 21 additions & 0 deletions tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal static class MessageStorage
private static readonly ConcurrentBag<TestProtoMessage> s_protoMessages = new();
private static readonly ConcurrentBag<(long, int)> s_versions = new();
private static readonly ConcurrentBag<byte[]> s_byteMessages = new();
private static readonly ConcurrentBag<byte[]> s_nullMessages = new();

public static void Add(ITestMessage message)
{
Expand All @@ -39,6 +40,11 @@ public static void Add(byte[] message)
s_byteMessages.Add(message);
}

public static void AddNullMessage(byte[] message)
{
s_nullMessages.Add(message);
}

public static async Task AssertCountMessageAsync(ITestMessage message, int count)
{
var start = DateTime.Now;
Expand Down Expand Up @@ -119,6 +125,21 @@ public static async Task AssertMessageAsync(byte[] message)
}
}

public static async Task AssertNullMessageAsync()
{
var start = DateTime.Now;
while (!s_nullMessages.IsEmpty)
{
if (DateTime.Now.Subtract(start).Seconds > TimeoutSec)
{
Assert.Fail("Null message not received");
return;
}

await Task.Delay(100).ConfigureAwait(false);
}
}

public static List<(long ticks, int version)> GetVersions()
{
return s_versions.ToList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;

namespace KafkaFlow.IntegrationTests.Core.Handlers;

internal class NullMessageHandler : IMessageHandler<byte[]>
{
public Task Handle(IMessageContext context, byte[] message)
{
MessageStorage.AddNullMessage(message);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.IntegrationTests.Core.Producers;

public class NullProducer
{

}
14 changes: 14 additions & 0 deletions tests/KafkaFlow.IntegrationTests/ProducerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,18 @@ public async Task ProduceNullKeyTest()
// Assert
await MessageStorage.AssertMessageAsync(message);
}

[TestMethod]
public async Task ProduceNullMessageTest()
{
// Arrange
var producer = _provider.GetRequiredService<IMessageProducer<NullProducer>>();
var key = Guid.NewGuid().ToString();

// Act
await producer.ProduceAsync(key, Array.Empty<byte>());

// Assert
await MessageStorage.AssertNullMessageAsync();
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -85,6 +86,60 @@ await _target.Invoke(
_typeResolverMock.VerifyAll();
}

[TestMethod]
public async Task Invoke_NullMessage_Serialize()
{
// Arrange
byte[] rawMessage = null;
var key = new object();
var deserializedMessage = new Message(key, new TestMessage());
IMessageContext resultContext = null;
var producerContext = new Mock<IProducerContext>();
producerContext.SetupGet(x => x.Topic).Returns("test-topic");

var transformedContextMock = new Mock<IMessageContext>();

_contextMock
.SetupGet(x => x.Message)
.Returns(deserializedMessage);

_typeResolverMock.Setup(x => x.OnProduceAsync(_contextMock.Object));

_serializerMock
.Setup(
x => x.SerializeAsync(
deserializedMessage.Value,
It.IsAny<Stream>(),
It.IsAny<ISerializerContext>()))
.Callback((object _, Stream stream, ISerializerContext _) => stream.WriteAsync(rawMessage));

_contextMock
.Setup(x => x.SetMessage(key, It.IsAny<IEnumerable<byte>>()))
.Returns(transformedContextMock.Object);

_contextMock
.SetupGet(x => x.ProducerContext)
.Returns(producerContext.Object);

// Act
await _target.Invoke(
_contextMock.Object,
ctx =>
{
resultContext = ctx;
return Task.CompletedTask;
});

// Assert
resultContext.Should().NotBeNull();
resultContext.Should().Be(transformedContextMock.Object);
resultContext.Message.Value.Should().BeNull();
_contextMock.VerifyAll();
_serializerMock.VerifyAll();
_typeResolverMock.VerifyAll();
}


private class TestMessage
{
}
Expand Down
2 changes: 2 additions & 0 deletions website/docs/guides/middlewares/serializer-middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Both classes can be provided as an argument through a factory method too.
For topics that have just one message type, use the `AddSingleTypeSerializer`/`AddSingleTypeDeserializer` method.
:::

Serializer middleware also handles the produce of tombstone records. The messages produced are `null` whenever the message value is null, but not when that value is an empty `byte` array.


```csharp
services.AddKafka(kafka => kafka
Expand Down
Loading