Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: async support on message type and schema registry resolvers #430

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace KafkaFlow
{
using System.Threading.Tasks;

/// <summary>
/// An interface to implement a type name resolver to messages serialized with schema registry serializers
/// </summary>
Expand All @@ -10,6 +12,6 @@ public interface ISchemaRegistryTypeNameResolver
/// </summary>
/// <param name="schemaId">Identifier of the schema</param>
/// <returns></returns>
string Resolve(int schemaId);
Task<string> ResolveAsync(int schemaId);
}
}
17 changes: 4 additions & 13 deletions src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,25 @@ namespace KafkaFlow
/// <summary>
/// The message type resolver to be used with schema registry serializers
/// </summary>
public class SchemaRegistryTypeResolver : IAsyncMessageTypeResolver
public class SchemaRegistryTypeResolver : IMessageTypeResolver
{
private static readonly ConcurrentDictionary<int, Type> Types = new();

private static readonly SemaphoreSlim Semaphore = new(1, 1);

private readonly IAsyncSchemaRegistryTypeNameResolver typeNameResolver;
private readonly ISchemaRegistryTypeNameResolver typeNameResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SchemaRegistryTypeResolver"/> class.
/// </summary>
/// <param name="typeNameResolver">A instance of the <see cref="ISchemaRegistryTypeNameResolver"/> interface.</param>
public SchemaRegistryTypeResolver(ISchemaRegistryTypeNameResolver typeNameResolver)
: this(new AsyncSchemaRegistryTypeNameResolverWrapper(typeNameResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SchemaRegistryTypeResolver"/> class.
/// </summary>
/// <param name="typeNameResolver">A instance of the <see cref="ISchemaRegistryTypeNameResolver"/> interface.</param>
public SchemaRegistryTypeResolver(IAsyncSchemaRegistryTypeNameResolver typeNameResolver)
{
this.typeNameResolver = typeNameResolver;
}

/// <inheritdoc />
public async Task<Type> OnConsumeAsync(IMessageContext context)
public async ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var schemaId = BinaryPrimitives.ReadInt32BigEndian(
((byte[]) context.Message.Value).AsSpan().Slice(1, 4));
Expand Down Expand Up @@ -70,6 +61,6 @@ public async Task<Type> OnConsumeAsync(IMessageContext context)
}

/// <inheritdoc />
public Task OnProduceAsync(IMessageContext context) => Task.CompletedTask;
public ValueTask OnProduceAsync(IMessageContext context) => default(ValueTask);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
namespace KafkaFlow
namespace KafkaFlow.Serializer.SchemaRegistry
{
using System.Threading.Tasks;
using Confluent.SchemaRegistry;
using Newtonsoft.Json;

internal class ConfluentAvroTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver
internal class ConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver
{
private readonly ISchemaRegistryClient client;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace KafkaFlow
using Google.Protobuf;
using Google.Protobuf.Reflection;

internal class ConfluentProtobufTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver
internal class ConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver
{
private readonly ISchemaRegistryClient client;

Expand Down
21 changes: 0 additions & 21 deletions src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs

This file was deleted.

13 changes: 8 additions & 5 deletions src/KafkaFlow.Serializer/DefaultTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

internal class DefaultTypeResolver : IMessageTypeResolver
{
private const string MessageType = "Message-Type";

public Type OnConsume(IMessageContext context)
public ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);

return typeName is null ?
null :
Type.GetType(typeName);
new ValueTask<Type>((Type) null) :
new ValueTask<Type>(Type.GetType(typeName));
}

public void OnProduce(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
if (context.Message.Value is null)
{
return;
return default(ValueTask);
}

var messageType = context.Message.Value.GetType();

context.Headers.SetString(
MessageType,
$"{messageType.FullName}, {messageType.Assembly.GetName().Name}");

return default(ValueTask);
}
}
}
24 changes: 0 additions & 24 deletions src/KafkaFlow.Serializer/IAsyncMessageTypeResolver.cs

This file was deleted.

5 changes: 3 additions & 2 deletions src/KafkaFlow.Serializer/IMessageTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Used by the serializer middleware to resolve the type when consuming and store it when producing
Expand All @@ -12,12 +13,12 @@ public interface IMessageTypeResolver
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
/// <returns></returns>
Type OnConsume(IMessageContext context);
ValueTask<Type> OnConsumeAsync(IMessageContext context);

/// <summary>
/// Stores the message type somewhere when producing
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
void OnProduce(IMessageContext context);
ValueTask OnProduceAsync(IMessageContext context);
}
}
1 change: 1 addition & 0 deletions src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.1.3" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

</Project>
15 changes: 1 addition & 14 deletions src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
public class SerializerConsumerMiddleware : IMessageMiddleware
{
private readonly ISerializer serializer;

private readonly IAsyncMessageTypeResolver typeResolver;
private readonly IMessageTypeResolver typeResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SerializerConsumerMiddleware"/> class.
Expand All @@ -21,18 +20,6 @@ public class SerializerConsumerMiddleware : IMessageMiddleware
public SerializerConsumerMiddleware(
ISerializer serializer,
IMessageTypeResolver typeResolver)
: this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SerializerConsumerMiddleware"/> class.
/// </summary>
/// <param name="serializer">Instance of <see cref="ISerializer"/></param>
/// <param name="typeResolver">Instance of <see cref="IAsyncMessageTypeResolver"/></param>
public SerializerConsumerMiddleware(
ISerializer serializer,
IAsyncMessageTypeResolver typeResolver)
{
this.serializer = serializer;
this.typeResolver = typeResolver;
Expand Down
14 changes: 1 addition & 13 deletions src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class SerializerProducerMiddleware : IMessageMiddleware

private readonly ISerializer serializer;

private readonly IAsyncMessageTypeResolver typeResolver;
private readonly IMessageTypeResolver typeResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SerializerProducerMiddleware"/> class.
Expand All @@ -22,18 +22,6 @@ public class SerializerProducerMiddleware : IMessageMiddleware
public SerializerProducerMiddleware(
ISerializer serializer,
IMessageTypeResolver typeResolver)
: this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SerializerProducerMiddleware"/> class.
/// </summary>
/// <param name="serializer">Instance of <see cref="ISerializer"/></param>
/// <param name="typeResolver">Instance of <see cref="IAsyncMessageTypeResolver"/></param>
public SerializerProducerMiddleware(
ISerializer serializer,
IAsyncMessageTypeResolver typeResolver)
{
this.serializer = serializer;
this.typeResolver = typeResolver;
Expand Down
6 changes: 4 additions & 2 deletions src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// The message type resolver to be used when all messages are the same type
Expand All @@ -19,12 +20,13 @@ public SingleMessageTypeResolver(Type messageType)
}

/// <inheritdoc/>
public Type OnConsume(IMessageContext context) => this.messageType;
public ValueTask<Type> OnConsumeAsync(IMessageContext context) => new ValueTask<Type>(this.messageType);

/// <inheritdoc/>
public void OnProduce(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
// Do nothing
return default(ValueTask);
}
}
}
Loading
Loading