From e744fea8e7ab1b04d8c18c2d00c5a87cacfb5957 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Sousa?= Date: Tue, 12 Sep 2023 22:29:45 +0100 Subject: [PATCH] feat!: async support on message type and schema registry resolvers --- ...ncSchemaRegistryTypeNameResolverWrapper.cs | 14 -- .../IAsyncSchemaRegistryTypeNameResolver.cs | 17 -- .../ISchemaRegistryTypeNameResolver.cs | 4 +- .../SchemaRegistryTypeResolver.cs | 17 +- .../AssemblyInfo.cs | 3 + .../ConfluentAvroTypeNameResolver.cs | 4 +- .../AssemblyInfo.cs | 3 + .../ConfluentProtobufTypeNameResolver.cs | 2 +- .../AsyncMessageTypeResolverWrapper.cs | 21 -- .../DefaultTypeResolver.cs | 13 +- .../IAsyncMessageTypeResolver.cs | 24 --- .../IMessageTypeResolver.cs | 5 +- .../KafkaFlow.Serializer.csproj | 1 + .../SerializerConsumerMiddleware.cs | 15 +- .../SerializerProducerMiddleware.cs | 14 +- .../SingleMessageTypeResolver.cs | 6 +- .../DummyObjects/DummyProtobufObject.cs | 201 ++++++++++++++++++ .../DummyObjects/DummyProtobufObject.proto | 11 + .../KafkaFlow.UnitTests.csproj | 3 + .../ConfluentAvroTypeNameResolverTests.cs | 47 ++++ .../ConfluentProtobufTypeNameResolverTests.cs | 49 +++++ .../SchemaRegistryTypeResolverTests.cs | 47 ++++ .../SerializerConsumerMiddlewareTests.cs | 12 +- .../SerializerProducerMiddlewareTests.cs | 2 +- .../UnityDependencyResolver.cs | 2 - 25 files changed, 399 insertions(+), 138 deletions(-) delete mode 100644 src/KafkaFlow.SchemaRegistry/AsyncSchemaRegistryTypeNameResolverWrapper.cs delete mode 100644 src/KafkaFlow.SchemaRegistry/IAsyncSchemaRegistryTypeNameResolver.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs delete mode 100644 src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs delete mode 100644 src/KafkaFlow.Serializer/IAsyncMessageTypeResolver.cs create mode 100644 src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs create mode 100644 src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto create mode 100644 src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs create mode 100644 src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs create mode 100644 src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs diff --git a/src/KafkaFlow.SchemaRegistry/AsyncSchemaRegistryTypeNameResolverWrapper.cs b/src/KafkaFlow.SchemaRegistry/AsyncSchemaRegistryTypeNameResolverWrapper.cs deleted file mode 100644 index d91fee9fc..000000000 --- a/src/KafkaFlow.SchemaRegistry/AsyncSchemaRegistryTypeNameResolverWrapper.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace KafkaFlow -{ - using System.Threading.Tasks; - - internal class AsyncSchemaRegistryTypeNameResolverWrapper : IAsyncSchemaRegistryTypeNameResolver - { - private readonly ISchemaRegistryTypeNameResolver typeNameResolver; - - public AsyncSchemaRegistryTypeNameResolverWrapper(ISchemaRegistryTypeNameResolver typeNameResolver) => - this.typeNameResolver = typeNameResolver; - - public Task ResolveAsync(int schemaId) => Task.FromResult(this.typeNameResolver.Resolve(schemaId)); - } -} diff --git a/src/KafkaFlow.SchemaRegistry/IAsyncSchemaRegistryTypeNameResolver.cs b/src/KafkaFlow.SchemaRegistry/IAsyncSchemaRegistryTypeNameResolver.cs deleted file mode 100644 index 497964faa..000000000 --- a/src/KafkaFlow.SchemaRegistry/IAsyncSchemaRegistryTypeNameResolver.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace KafkaFlow -{ - using System.Threading.Tasks; - - /// - /// An interface to implement a type name resolver to messages serialized with schema registry serializers - /// - public interface IAsyncSchemaRegistryTypeNameResolver - { - /// - /// Resolve the message type name of a schema - /// - /// Identifier of the schema - /// - Task ResolveAsync(int schemaId); - } -} diff --git a/src/KafkaFlow.SchemaRegistry/ISchemaRegistryTypeNameResolver.cs b/src/KafkaFlow.SchemaRegistry/ISchemaRegistryTypeNameResolver.cs index 6d4773156..17cc95c82 100644 --- a/src/KafkaFlow.SchemaRegistry/ISchemaRegistryTypeNameResolver.cs +++ b/src/KafkaFlow.SchemaRegistry/ISchemaRegistryTypeNameResolver.cs @@ -1,5 +1,7 @@ namespace KafkaFlow { + using System.Threading.Tasks; + /// /// An interface to implement a type name resolver to messages serialized with schema registry serializers /// @@ -10,6 +12,6 @@ public interface ISchemaRegistryTypeNameResolver /// /// Identifier of the schema /// - string Resolve(int schemaId); + Task ResolveAsync(int schemaId); } } diff --git a/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs b/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs index 20185edc4..f0dd56cd9 100644 --- a/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs +++ b/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs @@ -10,34 +10,25 @@ namespace KafkaFlow /// /// The message type resolver to be used with schema registry serializers /// - public class SchemaRegistryTypeResolver : IAsyncMessageTypeResolver + public class SchemaRegistryTypeResolver : IMessageTypeResolver { private static readonly ConcurrentDictionary Types = new(); private static readonly SemaphoreSlim Semaphore = new(1, 1); - private readonly IAsyncSchemaRegistryTypeNameResolver typeNameResolver; + private readonly ISchemaRegistryTypeNameResolver typeNameResolver; /// /// Initializes a new instance of the class. /// /// A instance of the interface. public SchemaRegistryTypeResolver(ISchemaRegistryTypeNameResolver typeNameResolver) - : this(new AsyncSchemaRegistryTypeNameResolverWrapper(typeNameResolver)) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// A instance of the interface. - public SchemaRegistryTypeResolver(IAsyncSchemaRegistryTypeNameResolver typeNameResolver) { this.typeNameResolver = typeNameResolver; } /// - public async Task OnConsumeAsync(IMessageContext context) + public async ValueTask OnConsumeAsync(IMessageContext context) { var schemaId = BinaryPrimitives.ReadInt32BigEndian( ((byte[]) context.Message.Value).AsSpan().Slice(1, 4)); @@ -70,6 +61,6 @@ public async Task OnConsumeAsync(IMessageContext context) } /// - public Task OnProduceAsync(IMessageContext context) => Task.CompletedTask; + public ValueTask OnProduceAsync(IMessageContext context) => default(ValueTask); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs new file mode 100644 index 000000000..17459bd5a --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs index 8a8af39e1..c0fc1c9a2 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs @@ -1,10 +1,10 @@ -namespace KafkaFlow +namespace KafkaFlow.Serializer.SchemaRegistry { using System.Threading.Tasks; using Confluent.SchemaRegistry; using Newtonsoft.Json; - internal class ConfluentAvroTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver + internal class ConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver { private readonly ISchemaRegistryClient client; diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs new file mode 100644 index 000000000..17459bd5a --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs index f57f8fca7..8ae17ee63 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs @@ -6,7 +6,7 @@ namespace KafkaFlow using Google.Protobuf; using Google.Protobuf.Reflection; - internal class ConfluentProtobufTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver + internal class ConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver { private readonly ISchemaRegistryClient client; diff --git a/src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs b/src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs deleted file mode 100644 index fac02e462..000000000 --- a/src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs +++ /dev/null @@ -1,21 +0,0 @@ -namespace KafkaFlow -{ - using System; - using System.Threading.Tasks; - - internal class AsyncMessageTypeResolverWrapper : IAsyncMessageTypeResolver - { - private readonly IMessageTypeResolver typeResolver; - - public AsyncMessageTypeResolverWrapper(IMessageTypeResolver typeResolver) => this.typeResolver = typeResolver; - - public Task OnConsumeAsync(IMessageContext context) => - Task.FromResult(this.typeResolver.OnConsume(context)); - - public Task OnProduceAsync(IMessageContext context) - { - this.typeResolver.OnProduce(context); - return Task.CompletedTask; - } - } -} diff --git a/src/KafkaFlow.Serializer/DefaultTypeResolver.cs b/src/KafkaFlow.Serializer/DefaultTypeResolver.cs index 31cba1dff..6613db2c9 100644 --- a/src/KafkaFlow.Serializer/DefaultTypeResolver.cs +++ b/src/KafkaFlow.Serializer/DefaultTypeResolver.cs @@ -1,25 +1,26 @@ namespace KafkaFlow { using System; + using System.Threading.Tasks; internal class DefaultTypeResolver : IMessageTypeResolver { private const string MessageType = "Message-Type"; - public Type OnConsume(IMessageContext context) + public ValueTask OnConsumeAsync(IMessageContext context) { var typeName = context.Headers.GetString(MessageType); return typeName is null ? - null : - Type.GetType(typeName); + new ValueTask((Type) null) : + new ValueTask(Type.GetType(typeName)); } - public void OnProduce(IMessageContext context) + public ValueTask OnProduceAsync(IMessageContext context) { if (context.Message.Value is null) { - return; + return default(ValueTask); } var messageType = context.Message.Value.GetType(); @@ -27,6 +28,8 @@ public void OnProduce(IMessageContext context) context.Headers.SetString( MessageType, $"{messageType.FullName}, {messageType.Assembly.GetName().Name}"); + + return default(ValueTask); } } } diff --git a/src/KafkaFlow.Serializer/IAsyncMessageTypeResolver.cs b/src/KafkaFlow.Serializer/IAsyncMessageTypeResolver.cs deleted file mode 100644 index ffe4626bd..000000000 --- a/src/KafkaFlow.Serializer/IAsyncMessageTypeResolver.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace KafkaFlow -{ - using System; - using System.Threading.Tasks; - - /// - /// Used by the serializer middleware to resolve the type when consuming and store it when producing - /// - public interface IAsyncMessageTypeResolver - { - /// - /// Returns the message type when consuming - /// - /// The containing the message and the metadata - /// - Task OnConsumeAsync(IMessageContext context); - - /// - /// Stores the message type somewhere when producing - /// - /// The containing the message and the metadata - Task OnProduceAsync(IMessageContext context); - } -} diff --git a/src/KafkaFlow.Serializer/IMessageTypeResolver.cs b/src/KafkaFlow.Serializer/IMessageTypeResolver.cs index 5d6d87551..a7b9190d8 100644 --- a/src/KafkaFlow.Serializer/IMessageTypeResolver.cs +++ b/src/KafkaFlow.Serializer/IMessageTypeResolver.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.Threading.Tasks; /// /// Used by the serializer middleware to resolve the type when consuming and store it when producing @@ -12,12 +13,12 @@ public interface IMessageTypeResolver /// /// The containing the message and the metadata /// - Type OnConsume(IMessageContext context); + ValueTask OnConsumeAsync(IMessageContext context); /// /// Stores the message type somewhere when producing /// /// The containing the message and the metadata - void OnProduce(IMessageContext context); + ValueTask OnProduceAsync(IMessageContext context); } } diff --git a/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj b/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj index d0e384d2f..2a0a1c219 100644 --- a/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj +++ b/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj @@ -13,6 +13,7 @@ + diff --git a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs b/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs index e2bc353b1..a00a81d6b 100644 --- a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs +++ b/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs @@ -10,8 +10,7 @@ public class SerializerConsumerMiddleware : IMessageMiddleware { private readonly ISerializer serializer; - - private readonly IAsyncMessageTypeResolver typeResolver; + private readonly IMessageTypeResolver typeResolver; /// /// Initializes a new instance of the class. @@ -21,18 +20,6 @@ public class SerializerConsumerMiddleware : IMessageMiddleware public SerializerConsumerMiddleware( ISerializer serializer, IMessageTypeResolver typeResolver) - : this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver)) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// Instance of - /// Instance of - public SerializerConsumerMiddleware( - ISerializer serializer, - IAsyncMessageTypeResolver typeResolver) { this.serializer = serializer; this.typeResolver = typeResolver; diff --git a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs b/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs index b7c0da7c8..2adf0f443 100644 --- a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs +++ b/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs @@ -12,7 +12,7 @@ public class SerializerProducerMiddleware : IMessageMiddleware private readonly ISerializer serializer; - private readonly IAsyncMessageTypeResolver typeResolver; + private readonly IMessageTypeResolver typeResolver; /// /// Initializes a new instance of the class. @@ -22,18 +22,6 @@ public class SerializerProducerMiddleware : IMessageMiddleware public SerializerProducerMiddleware( ISerializer serializer, IMessageTypeResolver typeResolver) - : this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver)) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// Instance of - /// Instance of - public SerializerProducerMiddleware( - ISerializer serializer, - IAsyncMessageTypeResolver typeResolver) { this.serializer = serializer; this.typeResolver = typeResolver; diff --git a/src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs b/src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs index b1d22d054..ee4cbc35c 100644 --- a/src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs +++ b/src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.Threading.Tasks; /// /// The message type resolver to be used when all messages are the same type @@ -19,12 +20,13 @@ public SingleMessageTypeResolver(Type messageType) } /// - public Type OnConsume(IMessageContext context) => this.messageType; + public ValueTask OnConsumeAsync(IMessageContext context) => new ValueTask(this.messageType); /// - public void OnProduce(IMessageContext context) + public ValueTask OnProduceAsync(IMessageContext context) { // Do nothing + return default(ValueTask); } } } diff --git a/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs new file mode 100644 index 000000000..1bc60a9b1 --- /dev/null +++ b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs @@ -0,0 +1,201 @@ +// +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: DummyProtobufObject.proto +// +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +namespace KafkaFlow.UnitTests { + + /// Holder for reflection information generated from DummyProtobufObject.proto + public static partial class DummyProtobufObjectReflection { + + #region Descriptor + /// File descriptor for DummyProtobufObject.proto + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static DummyProtobufObjectReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "ChlEdW1teVByb3RvYnVmT2JqZWN0LnByb3RvEhNLYWZrYUZsb3cuVW5pdFRl", + "c3RzIjUKE0R1bW15UHJvdG9idWZPYmplY3QSDgoGZmllbGQxGAEgASgJEg4K", + "BmZpZWxkMhgCIAEoBWIGcHJvdG8z")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::KafkaFlow.UnitTests.DummyProtobufObject), global::KafkaFlow.UnitTests.DummyProtobufObject.Parser, new[]{ "Field1", "Field2" }, null, null, null, null) + })); + } + #endregion + + } + #region Messages + public sealed partial class DummyProtobufObject : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new DummyProtobufObject()); + private pb::UnknownFieldSet _unknownFields; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::KafkaFlow.UnitTests.DummyProtobufObjectReflection.Descriptor.MessageTypes[0]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public DummyProtobufObject() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public DummyProtobufObject(DummyProtobufObject other) : this() { + field1_ = other.field1_; + field2_ = other.field2_; + _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public DummyProtobufObject Clone() { + return new DummyProtobufObject(this); + } + + /// Field number for the "field1" field. + public const int Field1FieldNumber = 1; + private string field1_ = ""; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public string Field1 { + get { return field1_; } + set { + field1_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + /// Field number for the "field2" field. + public const int Field2FieldNumber = 2; + private int field2_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int Field2 { + get { return field2_; } + set { + field2_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as DummyProtobufObject); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(DummyProtobufObject other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Field1 != other.Field1) return false; + if (Field2 != other.Field2) return false; + return Equals(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + if (Field1.Length != 0) hash ^= Field1.GetHashCode(); + if (Field2 != 0) hash ^= Field2.GetHashCode(); + if (_unknownFields != null) { + hash ^= _unknownFields.GetHashCode(); + } + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + if (Field1.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Field1); + } + if (Field2 != 0) { + output.WriteRawTag(16); + output.WriteInt32(Field2); + } + if (_unknownFields != null) { + _unknownFields.WriteTo(output); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + if (Field1.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Field1); + } + if (Field2 != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(Field2); + } + if (_unknownFields != null) { + size += _unknownFields.CalculateSize(); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(DummyProtobufObject other) { + if (other == null) { + return; + } + if (other.Field1.Length != 0) { + Field1 = other.Field1; + } + if (other.Field2 != 0) { + Field2 = other.Field2; + } + _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input); + break; + case 10: { + Field1 = input.ReadString(); + break; + } + case 16: { + Field2 = input.ReadInt32(); + break; + } + } + } + } + + } + + #endregion + +} + +#endregion Designer generated code diff --git a/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto new file mode 100644 index 000000000..7acea2bdb --- /dev/null +++ b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto @@ -0,0 +1,11 @@ +// can be generated using +// protoc --csharp_out=. DummyProtobufObject.proto + +syntax = "proto3"; + +package KafkaFlow.UnitTests; + +message DummyProtobufObject { + string field1 = 1; + int32 field2 = 2; +} diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index cfba1c2c7..9b1c45f82 100644 --- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -32,7 +32,10 @@ + + + diff --git a/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs new file mode 100644 index 000000000..5fdeb0921 --- /dev/null +++ b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs @@ -0,0 +1,47 @@ +namespace KafkaFlow.UnitTests.Middlewares.Serialization +{ + using System.Threading.Tasks; + using Confluent.SchemaRegistry; + using FluentAssertions; + using KafkaFlow.Serializer.SchemaRegistry; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + using Newtonsoft.Json; + + [TestClass] + public class ConfluentAvroTypeNameResolverTests + { + private readonly Mock schemaRegistryClient; + private readonly ConfluentAvroTypeNameResolver schemaRegistryTypeResolver; + + public ConfluentAvroTypeNameResolverTests() + { + this.schemaRegistryClient = new Mock(); + this.schemaRegistryTypeResolver = new ConfluentAvroTypeNameResolver(this.schemaRegistryClient.Object); + } + + [TestMethod] + public async Task ResolveAsync_ValidSchemaObject_ReturnsAvroFieldsInCorrectFormat() + { + // Arrange + var schemaId = 420; + var type = typeof(ConfluentAvroTypeNameResolverTests); + var schemaObj = new + { + Name = type.Name, + NameSpace = type.Namespace, + }; + + var schema = new Schema(JsonConvert.SerializeObject(schemaObj), SchemaType.Avro); + + this.schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, null)) + .ReturnsAsync(schema); + + // Act + var avroFields = await this.schemaRegistryTypeResolver.ResolveAsync(schemaId); + + // Assert + avroFields.Should().Be($"{schemaObj.NameSpace}.{schemaObj.Name}"); + } + } +} diff --git a/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs new file mode 100644 index 000000000..cd6ec854f --- /dev/null +++ b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs @@ -0,0 +1,49 @@ +namespace KafkaFlow.UnitTests.Middlewares.Serialization +{ + using System; + using System.Threading.Tasks; + using Confluent.SchemaRegistry; + + using FluentAssertions; + + using Google.Protobuf; + using KafkaFlow.Serializer.SchemaRegistry; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class ConfluentProtobufTypeNameResolverTests + { + private readonly Mock schemaRegistryClient; + private readonly ConfluentProtobufTypeNameResolver schemaRegistryTypeResolver; + + public ConfluentProtobufTypeNameResolverTests() + { + this.schemaRegistryClient = new Mock(); + this.schemaRegistryTypeResolver = new ConfluentProtobufTypeNameResolver(this.schemaRegistryClient.Object); + } + + [TestMethod] + public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields() + { + // Arrange + var schemaId = 420; + + var dummyProtobufObj = new DummyProtobufObject + { + Field1 = "Field1", + Field2 = 8, + }; + var base64Encoded = Convert.ToBase64String(dummyProtobufObj.ToByteArray()); + + this.schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized")) + .ReturnsAsync(new Schema(base64Encoded, SchemaType.Protobuf)); + + // Act + var protoFields = await this.schemaRegistryTypeResolver.ResolveAsync(schemaId); + + // Assert + protoFields.Should().NotBeNull(); + } + } +} diff --git a/src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs b/src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs new file mode 100644 index 000000000..38f720526 --- /dev/null +++ b/src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs @@ -0,0 +1,47 @@ +namespace KafkaFlow.UnitTests.Middlewares.Serialization +{ + using System; + using System.Buffers.Binary; + using System.Threading.Tasks; + using FluentAssertions; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class SchemaRegistryTypeResolverTests + { + private readonly Mock messageContextMock; + private readonly Mock schemaRegistryTypeNameResolverMock; + private readonly SchemaRegistryTypeResolver schemaRegistryTypeResolver; + private readonly byte[] messageKey = new byte[] { 0x18, 0x19 }; + private readonly byte[] messageValue = new byte[] { 0x20, 0x21, 0x22, 0x23, 0x24, 0x25 }; + + public SchemaRegistryTypeResolverTests() + { + this.messageContextMock = new Mock(); + this.messageContextMock.Setup(context => context.Message).Returns(new Message(messageKey, messageValue)); + this.schemaRegistryTypeNameResolverMock = new Mock(); + this.schemaRegistryTypeResolver = new SchemaRegistryTypeResolver(this.schemaRegistryTypeNameResolverMock.Object); + } + + [TestMethod] + public async Task OnConsumeAsync_WhenCalledTwice_TypeIsResolvedOnceThenTypeIsLoadedFromCache() + { + // Arrange + var expectedSchemaId = BinaryPrimitives.ReadInt32BigEndian( + this.messageValue.AsSpan().Slice(1, 4)); + + this.schemaRegistryTypeNameResolverMock.Setup( + resolver => resolver.ResolveAsync(expectedSchemaId)).ReturnsAsync(typeof(SchemaRegistryTypeResolverTests).FullName); + + // Act + await this.schemaRegistryTypeResolver.OnConsumeAsync(messageContextMock.Object); + var type = await this.schemaRegistryTypeResolver.OnConsumeAsync(messageContextMock.Object); + + // Assert + this.schemaRegistryTypeNameResolverMock.Verify(resolver => resolver.ResolveAsync(expectedSchemaId), Times.Once); + var expectedObject = (SchemaRegistryTypeResolverTests)Activator.CreateInstance(type); + expectedObject.Should().NotBeNull(); + } + } +} diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index 117cd9f50..cf0f1398b 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -39,8 +39,8 @@ public async Task Invoke_NullMessageType_ReturnWithoutCallingNext() .Returns(new Message(new byte[1], new byte[1])); this.typeResolverMock - .Setup(x => x.OnConsume(this.contextMock.Object)) - .Returns((Type)null); + .Setup(x => x.OnConsumeAsync(this.contextMock.Object)) + .ReturnsAsync((Type)null); // Act await this.target.Invoke(this.contextMock.Object, _ => this.SetNextCalled()); @@ -70,7 +70,7 @@ public async Task Invoke_NullMessage_CallNext() this.serializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); - this.typeResolverMock.Verify(x => x.OnConsume(It.IsAny()), Times.Never); + this.typeResolverMock.Verify(x => x.OnConsumeAsync(It.IsAny()), Times.Never); } [TestMethod] @@ -91,7 +91,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() this.serializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); - this.typeResolverMock.Verify(x => x.OnConsume(It.IsAny()), Times.Never); + this.typeResolverMock.Verify(x => x.OnConsumeAsync(It.IsAny()), Times.Never); } [TestMethod] @@ -119,8 +119,8 @@ public async Task Invoke_ValidMessage_Deserialize() .Returns(transformedContextMock.Object); this.typeResolverMock - .Setup(x => x.OnConsume(this.contextMock.Object)) - .Returns(messageType); + .Setup(x => x.OnConsumeAsync(this.contextMock.Object)) + .ReturnsAsync(messageType); this.serializerMock .Setup(x => x.DeserializeAsync(It.IsAny(), messageType, It.IsAny())) diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs index 0b45b7a1e..74030a595 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs @@ -48,7 +48,7 @@ public async Task Invoke_ValidMessage_Serialize() .SetupGet(x => x.Message) .Returns(deserializedMessage); - this.typeResolverMock.Setup(x => x.OnProduce(this.contextMock.Object)); + this.typeResolverMock.Setup(x => x.OnProduceAsync(this.contextMock.Object)); this.serializerMock .Setup( diff --git a/src/KafkaFlow.Unity/UnityDependencyResolver.cs b/src/KafkaFlow.Unity/UnityDependencyResolver.cs index ed3c8b3ad..35b06c43f 100644 --- a/src/KafkaFlow.Unity/UnityDependencyResolver.cs +++ b/src/KafkaFlow.Unity/UnityDependencyResolver.cs @@ -2,9 +2,7 @@ namespace KafkaFlow.Unity { using System; using System.Collections.Generic; - using System.Linq; using global::Unity; - using global::Unity.Resolution; /// /// Unity implementation of