From b510e9a8ac7089bf8fa9fc5c168eb9af5cd8a42b Mon Sep 17 00:00:00 2001 From: Roger Castaldo Date: Wed, 29 Jan 2025 23:00:24 -0500 Subject: [PATCH] Version - 2.1 (#5) * type updating Updated Message types to allow both class and struct to allow for passing of simpler single data type messages for expanding capabilities Added in some built in encoders to encode specific lower level data types for efficiency and ease of use, also to avoid using JSON encoding when the data types are simple * updating external references Updating the externally referenced libraries across the board * updating actions updating actions * fixing build issue attempting to fix build issues with respect to vsxmd --- .github/workflows/unit-test-report.yml | 10 +- .github/workflows/unittests8x.yml | 10 +- Abstractions/Abstractions.csproj | 14 - .../Interfaces/IBaseContractConnection.cs | 14 +- .../Interfaces/IContractConnection.cs | 13 +- .../Interfaces/IMappableContractConnection.cs | 3 +- .../Interfaces/IMetricContractConnection.cs | 6 +- .../IMultiServiceContractConnection.cs | 13 +- Abstractions/Interfaces/IRecievedMessage.cs | 1 - .../IAfterDecodeSpecificTypeMiddleware.cs | 1 - .../IBeforeEncodeSpecificTypeMiddleware.cs | 1 - .../Middleware/ISpecificTypeMiddleware.cs | 1 - Abstractions/Messages/QueryResult.cs | 3 +- AutomatedTesting/AutomatedTesting.csproj | 8 +- AutomatedTesting/DefaultEncodersTests.cs | 311 ++++++++++++++++++ Connectors/ActiveMQ/ActiveMQ.csproj | 4 - Connectors/ApachePulsar/ApachePulsar.csproj | 6 +- Connectors/ApachePulsar/Subscription.cs | 1 - .../AzureServiceBus/AzureServiceBus.csproj | 6 +- Connectors/GooglePubSub/Connection.cs | 1 - Connectors/GooglePubSub/GooglePubSub.csproj | 6 +- Connectors/GooglePubSub/Subscription.cs | 1 - Connectors/HiveMQ/HiveMQ.csproj | 6 +- Connectors/InMemory/InMemory.csproj | 7 - Connectors/Kafka/Kafka.csproj | 6 +- Connectors/KubeMQ/KubeMQ.csproj | 8 +- Connectors/NATS/NATS.csproj | 6 +- Connectors/RabbitMQ/RabbitMQ.csproj | 4 - Connectors/Redis/Redis.csproj | 6 +- Core/Connections/AConnection.cs | 26 +- Core/Connections/AMappableConnection.cs | 1 - Core/Connections/Connection.cs | 5 - Core/Connections/MappedConnection.cs | 6 - Core/Connections/MultiServiceConnection.cs | 3 - Core/Core.csproj | 15 +- Core/Defaults/BitConverterHelper.cs | 19 ++ Core/Defaults/BooleanEncoder.cs | 19 ++ Core/Defaults/ByteArrayEncoder.cs | 13 + Core/Defaults/ByteEncoder.cs | 18 + Core/Defaults/CharEncoder.cs | 18 + Core/Defaults/DecimalEncoder.cs | 37 +++ Core/Defaults/DoubleEncoder.cs | 18 + Core/Defaults/FloatEncoder.cs | 18 + Core/Defaults/HalfEncoder.cs | 18 + Core/Defaults/IntEncoder.cs | 18 + Core/Defaults/LongEncoder.cs | 18 + Core/Defaults/ShortEncoder.cs | 18 + Core/Defaults/UIntEncoder.cs | 18 + Core/Defaults/ULongEncoder.cs | 18 + Core/Defaults/UShortEncoder.cs | 18 + Core/Factories/MessageTypeFactory.cs | 3 +- Core/Interfaces/Conversion/IConversionPath.cs | 1 - Core/Interfaces/Factories/IMessageFactory.cs | 2 +- Core/Messages/RecievedMessage.cs | 1 - Core/Subscriptions/PubSubSubscription.cs | 1 - .../QueryResponseSubscription.cs | 1 - Core/Subscriptions/SubscriptionBase.cs | 1 - Core/Utility.cs | 2 - Samples/ApachePulsarSample/Program.cs | 1 - Shared.props | 17 +- 60 files changed, 661 insertions(+), 188 deletions(-) create mode 100644 AutomatedTesting/DefaultEncodersTests.cs create mode 100644 Core/Defaults/BitConverterHelper.cs create mode 100644 Core/Defaults/BooleanEncoder.cs create mode 100644 Core/Defaults/ByteArrayEncoder.cs create mode 100644 Core/Defaults/ByteEncoder.cs create mode 100644 Core/Defaults/CharEncoder.cs create mode 100644 Core/Defaults/DecimalEncoder.cs create mode 100644 Core/Defaults/DoubleEncoder.cs create mode 100644 Core/Defaults/FloatEncoder.cs create mode 100644 Core/Defaults/HalfEncoder.cs create mode 100644 Core/Defaults/IntEncoder.cs create mode 100644 Core/Defaults/LongEncoder.cs create mode 100644 Core/Defaults/ShortEncoder.cs create mode 100644 Core/Defaults/UIntEncoder.cs create mode 100644 Core/Defaults/ULongEncoder.cs create mode 100644 Core/Defaults/UShortEncoder.cs diff --git a/.github/workflows/unit-test-report.yml b/.github/workflows/unit-test-report.yml index 60e9c89..02c66e4 100644 --- a/.github/workflows/unit-test-report.yml +++ b/.github/workflows/unit-test-report.yml @@ -14,18 +14,18 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Setup .NET - uses: actions/setup-dotnet@v2 + uses: actions/setup-dotnet@v4 with: dotnet-version: | 8.0.x - name: Restore dependencies - run: dotnet restore -p:TargetFramework=net8.0 AutomatedTesting + run: dotnet restore -p:TargetFramework=net8.0 -p:BlockMD=true AutomatedTesting - name: Build-8.0 - run: dotnet build --framework net8.0 --no-restore AutomatedTesting + run: dotnet build --framework net8.0 -p:BlockMD=true --no-restore AutomatedTesting - name: Test-8.0 - run: dotnet test --framework net8.0 --no-build --verbosity normal --logger "trx;LogFileName=test-results.trx" --collect:"Code Coverage" AutomatedTesting + run: dotnet test --framework net8.0 -p:BlockMD=true --no-build --verbosity normal --logger "trx;LogFileName=test-results.trx" --collect:"Code Coverage" AutomatedTesting - name: report results uses: bibipkins/dotnet-test-reporter@v1.4.0 with: diff --git a/.github/workflows/unittests8x.yml b/.github/workflows/unittests8x.yml index bb4517f..286b450 100644 --- a/.github/workflows/unittests8x.yml +++ b/.github/workflows/unittests8x.yml @@ -12,15 +12,15 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Setup .NET - uses: actions/setup-dotnet@v2 + uses: actions/setup-dotnet@v4 with: dotnet-version: | 8.0.x - name: Restore dependencies - run: dotnet restore -p:TargetFramework=net8.0 AutomatedTesting + run: dotnet restore -p:TargetFramework=net8.0 -p:BlockMD=true AutomatedTesting - name: Build-8.0 - run: dotnet build --framework net8.0 --no-restore AutomatedTesting + run: dotnet build --framework net8.0 -p:BlockMD=true --no-restore AutomatedTesting - name: Test-8.0 - run: dotnet test --framework net8.0 --no-build --verbosity normal AutomatedTesting + run: dotnet test --framework net8.0 -p:BlockMD=true --no-build --verbosity normal AutomatedTesting diff --git a/Abstractions/Abstractions.csproj b/Abstractions/Abstractions.csproj index f6cb5da..13f58bf 100644 --- a/Abstractions/Abstractions.csproj +++ b/Abstractions/Abstractions.csproj @@ -11,18 +11,4 @@ Abstractions for MQContract - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - True - \ - - - diff --git a/Abstractions/Interfaces/IBaseContractConnection.cs b/Abstractions/Interfaces/IBaseContractConnection.cs index c7c9834..80165e6 100644 --- a/Abstractions/Interfaces/IBaseContractConnection.cs +++ b/Abstractions/Interfaces/IBaseContractConnection.cs @@ -19,8 +19,7 @@ public interface IBaseContractConnection : IDisposable, IAsyncDisposable /// A cancellation token /// /// A subscription instance that can be ended when desired - ValueTask SubscribeAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()) - where T : class; + ValueTask SubscribeAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to create a subscription into the underlying service Pub/Sub style and have the messages processed syncrhonously /// @@ -33,8 +32,7 @@ public interface IBaseContractConnection : IDisposable, IAsyncDisposable /// A cancellation token /// /// A subscription instance that can be ended when desired - ValueTask SubscribeAsync(Action> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()) - where T : class; + ValueTask SubscribeAsync(Action> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to create a subscription into the underlying service Query/Reponse style and have the messages processed asynchronously /// @@ -48,9 +46,7 @@ public interface IBaseContractConnection : IDisposable, IAsyncDisposable /// A cancellation token /// /// A subscription instance that can be ended when desired - ValueTask SubscribeQueryAsyncResponseAsync(Func, ValueTask>> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()) - where Q : class - where R : class; + ValueTask SubscribeQueryAsyncResponseAsync(Func, ValueTask>> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to create a subscription into the underlying service Query/Reponse style and have the messages processed synchronously /// @@ -64,9 +60,7 @@ public interface IBaseContractConnection : IDisposable, IAsyncDisposable /// A cancellation token /// /// A subscription instance that can be ended when desired - ValueTask SubscribeQueryResponseAsync(Func, QueryResponseMessage> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()) - where Q : class - where R : class; + ValueTask SubscribeQueryResponseAsync(Func, QueryResponseMessage> messageReceived, Action errorReceived, string? channel = null, string? group = null, bool ignoreMessageHeader = false, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to close off the contract connection and close it's underlying service connection /// diff --git a/Abstractions/Interfaces/IContractConnection.cs b/Abstractions/Interfaces/IContractConnection.cs index 31b2b2a..8c45017 100644 --- a/Abstractions/Interfaces/IContractConnection.cs +++ b/Abstractions/Interfaces/IContractConnection.cs @@ -23,8 +23,7 @@ public interface IContractConnection : IBaseContractConnection /// A cancellation token /// /// A result indicating the tranmission results - ValueTask PublishAsync(T message, string? channel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()) - where T : class; + ValueTask PublishAsync(T message, string? channel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to send a bulk set of messages into the underlying service Pub/Sub style /// @@ -34,8 +33,7 @@ public interface IContractConnection : IBaseContractConnection /// A cancellation token /// /// A result indicating the tranmission results - ValueTask> BulkPublishAsync(IEnumerable<(T message,MessageHeader? messageHeader)> messages, string? channel = null, CancellationToken cancellationToken = new CancellationToken()) - where T : class; + ValueTask> BulkPublishAsync(IEnumerable<(T message,MessageHeader? messageHeader)> messages, string? channel = null, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to send a message into the underlying service in the Query/Response style @@ -51,9 +49,7 @@ public interface IContractConnection : IBaseContractConnection /// A cancellation token /// /// A result indicating the success or failure as well as the returned message - ValueTask> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()) - where Q : class - where R : class; + ValueTask> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to send a message into the underlying service in the Query/Response style. The return type is not specified here and is instead obtained from the QueryResponseTypeAttribute /// attached to the Query message type class. @@ -68,7 +64,6 @@ public interface IContractConnection : IBaseContractConnection /// A cancellation token /// /// A result indicating the success or failure as well as the returned message - ValueTask> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()) - where Q : class; + ValueTask> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()); } } diff --git a/Abstractions/Interfaces/IMappableContractConnection.cs b/Abstractions/Interfaces/IMappableContractConnection.cs index a206058..348e4da 100644 --- a/Abstractions/Interfaces/IMappableContractConnection.cs +++ b/Abstractions/Interfaces/IMappableContractConnection.cs @@ -41,8 +41,7 @@ public interface IMappableContractConnection : IBaseContractConnection /// The name of the service connection, not necessarily unique, but can be used for logging and other things /// The service connection to use when the message is of type T /// - CC RegisterServiceConnection(string serviceConnectionName, IMessageServiceConnection messageServiceConnection) - where T : class; + CC RegisterServiceConnection(string serviceConnectionName, IMessageServiceConnection messageServiceConnection); /// /// Register a service connection to be used when the messageHeader contains the messageHeaderKey and it's value is messageHeaderValue /// diff --git a/Abstractions/Interfaces/IMetricContractConnection.cs b/Abstractions/Interfaces/IMetricContractConnection.cs index 9493b08..e5f26c9 100644 --- a/Abstractions/Interfaces/IMetricContractConnection.cs +++ b/Abstractions/Interfaces/IMetricContractConnection.cs @@ -32,8 +32,7 @@ CC RegisterMiddleware(Func constructInstance) /// The message type that this middleware is specifically called for /// The Contract Connection instance to allow chaining calls CC RegisterMiddleware() - where T : ISpecificTypeMiddleware - where M : class; + where T : ISpecificTypeMiddleware; /// /// Register a middleware of a given type T to be used by the contract connection /// @@ -42,8 +41,7 @@ CC RegisterMiddleware() /// The message type that this middleware is specifically called for /// The Contract Connection instance to allow chaining calls CC RegisterMiddleware(Func constructInstance) - where T : ISpecificTypeMiddleware - where M : class; + where T : ISpecificTypeMiddleware; /// /// Called to activate the metrics tracking middleware for this connection instance /// diff --git a/Abstractions/Interfaces/IMultiServiceContractConnection.cs b/Abstractions/Interfaces/IMultiServiceContractConnection.cs index 3788d07..9a4a33c 100644 --- a/Abstractions/Interfaces/IMultiServiceContractConnection.cs +++ b/Abstractions/Interfaces/IMultiServiceContractConnection.cs @@ -32,8 +32,7 @@ public interface IMultiServiceContractConnection : /// A cancellation token /// /// A result indicating the tranmission results - ValueTask PublishAsync(T message, string? channel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()) - where T : class; + ValueTask PublishAsync(T message, string? channel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to send a bulk set of messages into the underlying services Pub/Sub style /// @@ -43,8 +42,7 @@ public interface IMultiServiceContractConnection : /// A cancellation token /// /// A result indicating the tranmission results - ValueTask> BulkPublishAsync(IEnumerable<(T message, MessageHeader? messageHeader)> messages, string? channel = null, CancellationToken cancellationToken = new CancellationToken()) - where T : class; + ValueTask> BulkPublishAsync(IEnumerable<(T message, MessageHeader? messageHeader)> messages, string? channel = null, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to send a message into the underlying services in the Query/Response style /// @@ -59,9 +57,7 @@ public interface IMultiServiceContractConnection : /// A cancellation token /// /// A result indicating the success or failure as well as the returned message - ValueTask>> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()) - where Q : class - where R : class; + ValueTask>> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()); /// /// Called to send a message into the underlying services in the Query/Response style. The return type is not specified here and is instead obtained from the QueryResponseTypeAttribute /// attached to the Query message type class. @@ -76,7 +72,6 @@ public interface IMultiServiceContractConnection : /// A cancellation token /// /// A result indicating the success or failure as well as the returned message - ValueTask>> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()) - where Q : class; + ValueTask>> QueryAsync(Q message, TimeSpan? timeout = null, string? channel = null, string? responseChannel = null, MessageHeader? messageHeader = null, CancellationToken cancellationToken = new CancellationToken()); } } diff --git a/Abstractions/Interfaces/IRecievedMessage.cs b/Abstractions/Interfaces/IRecievedMessage.cs index ef94e4e..cf2f417 100644 --- a/Abstractions/Interfaces/IRecievedMessage.cs +++ b/Abstractions/Interfaces/IRecievedMessage.cs @@ -7,7 +7,6 @@ namespace MQContract.Interfaces /// /// The class type of the underlying message public interface IReceivedMessage - where T : class { /// /// The unique ID of the received message that was specified on the transmission side diff --git a/Abstractions/Interfaces/Middleware/IAfterDecodeSpecificTypeMiddleware.cs b/Abstractions/Interfaces/Middleware/IAfterDecodeSpecificTypeMiddleware.cs index d976e85..f38b79b 100644 --- a/Abstractions/Interfaces/Middleware/IAfterDecodeSpecificTypeMiddleware.cs +++ b/Abstractions/Interfaces/Middleware/IAfterDecodeSpecificTypeMiddleware.cs @@ -6,7 +6,6 @@ namespace MQContract.Interfaces.Middleware /// This interface represents a Middleware to execute after a Message of the given type T has been decoded from a ServiceMessage to the expected Class /// public interface IAfterDecodeSpecificTypeMiddleware : ISpecificTypeMiddleware - where T : class { /// /// This is the method invoked as part of the Middleware processing during message decoding diff --git a/Abstractions/Interfaces/Middleware/IBeforeEncodeSpecificTypeMiddleware.cs b/Abstractions/Interfaces/Middleware/IBeforeEncodeSpecificTypeMiddleware.cs index 50f914a..4ab5476 100644 --- a/Abstractions/Interfaces/Middleware/IBeforeEncodeSpecificTypeMiddleware.cs +++ b/Abstractions/Interfaces/Middleware/IBeforeEncodeSpecificTypeMiddleware.cs @@ -6,7 +6,6 @@ namespace MQContract.Interfaces.Middleware /// This interface represents a Middleware to execute Before a specific message type is encoded /// public interface IBeforeEncodeSpecificTypeMiddleware : ISpecificTypeMiddleware - where T : class { /// /// This is the method invoked as part of the Middle Ware processing during message encoding diff --git a/Abstractions/Interfaces/Middleware/ISpecificTypeMiddleware.cs b/Abstractions/Interfaces/Middleware/ISpecificTypeMiddleware.cs index af90822..8b94a34 100644 --- a/Abstractions/Interfaces/Middleware/ISpecificTypeMiddleware.cs +++ b/Abstractions/Interfaces/Middleware/ISpecificTypeMiddleware.cs @@ -4,7 +4,6 @@ /// Base Specific Type Middleware just used to limit Generic Types for Register Middleware /// public interface ISpecificTypeMiddleware - where T : class { } } diff --git a/Abstractions/Messages/QueryResult.cs b/Abstractions/Messages/QueryResult.cs index bb10d5d..d25b166 100644 --- a/Abstractions/Messages/QueryResult.cs +++ b/Abstractions/Messages/QueryResult.cs @@ -8,8 +8,7 @@ /// The response headers /// The resulting response if there was one /// The error message for the response if it failed and an error was returned - public record QueryResult(string ID,MessageHeader Header,T? Result=null,string? Error=null) + public record QueryResult(string ID,MessageHeader Header,T? Result=default,string? Error=null) : TransmissionResult(ID,Error) - where T : class {} } diff --git a/AutomatedTesting/AutomatedTesting.csproj b/AutomatedTesting/AutomatedTesting.csproj index 6ffa256..b2655aa 100644 --- a/AutomatedTesting/AutomatedTesting.csproj +++ b/AutomatedTesting/AutomatedTesting.csproj @@ -14,13 +14,13 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/AutomatedTesting/DefaultEncodersTests.cs b/AutomatedTesting/DefaultEncodersTests.cs new file mode 100644 index 0000000..982c6b5 --- /dev/null +++ b/AutomatedTesting/DefaultEncodersTests.cs @@ -0,0 +1,311 @@ +using AutomatedTesting.Messages; +using Moq; +using MQContract.Attributes; +using MQContract.Interfaces.Service; +using MQContract; +using System.Diagnostics; +using System.Security.Cryptography; +using MQContract.Interfaces; +using System.Runtime.InteropServices; + +namespace AutomatedTesting +{ + [TestClass] + public class DefaultEncodersTests + { + private const string ChannelName = "TestDirectEncoders"; + + [TestMethod] + public async Task TestByteArrayEncoder() + { + #region Arrange + var transmissionResult = new TransmissionResult(Guid.NewGuid().ToString()); + var serviceSubscription = new Mock(); + + var testMessage = RandomNumberGenerator.GetBytes(1024); + + List serviceMessages = []; + var actions = new List>(); + var recievedMessages = new List>(); + + var serviceConnection = new Mock(); + serviceConnection.Setup(x => x.SubscribeAsync(Capture.In(actions), It.IsAny>(), It.IsAny(), + It.IsAny(), It.IsAny())) + .ReturnsAsync(serviceSubscription.Object); + serviceConnection.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny())) + .Returns((ServiceMessage message, CancellationToken cancellationToken) => + { + var rmessage = Helper.ProduceReceivedServiceMessage(message); + serviceMessages.Add(rmessage); + foreach (var act in actions) + act(rmessage); + return ValueTask.FromResult(transmissionResult); + }); + + var contractConnection = ContractConnection.Instance(serviceConnection.Object); + #endregion + + #region Act + var subscription = await contractConnection.SubscribeAsync((msg) => recievedMessages.Add(msg), (err) => { }, ChannelName); + var result = await contractConnection.PublishAsync(testMessage,ChannelName); + #endregion + + #region Assert + Assert.IsTrue(await Helper.WaitForCount(recievedMessages, 1, TimeSpan.FromMinutes(1))); + + await subscription.EndAsync(); + + Assert.IsNotNull(result); + Assert.AreEqual(transmissionResult, result); + Assert.IsTrue(Enumerable.SequenceEqual(testMessage, recievedMessages[0].Message)); + Assert.IsTrue(Enumerable.SequenceEqual(testMessage, serviceMessages[0].Data.ToArray())); + #endregion + + #region Verify + serviceConnection.Verify(x => x.PublishAsync(It.IsAny(), It.IsAny()), Times.Once); + serviceConnection.Verify(x => x.SubscribeAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + #endregion + } + + [TestMethod] + public async Task TestBooleanEncoder() + { + #region Arrange + var transmissionResult = new TransmissionResult(Guid.NewGuid().ToString()); + var serviceSubscription = new Mock(); + + var testMessage = true; + + List serviceMessages = []; + var actions = new List>(); + var recievedMessages = new List>(); + + var serviceConnection = new Mock(); + serviceConnection.Setup(x => x.SubscribeAsync(Capture.In(actions), It.IsAny>(), It.IsAny(), + It.IsAny(), It.IsAny())) + .ReturnsAsync(serviceSubscription.Object); + serviceConnection.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny())) + .Returns((ServiceMessage message, CancellationToken cancellationToken) => + { + var rmessage = Helper.ProduceReceivedServiceMessage(message); + serviceMessages.Add(rmessage); + foreach (var act in actions) + act(rmessage); + return ValueTask.FromResult(transmissionResult); + }); + + var contractConnection = ContractConnection.Instance(serviceConnection.Object); + #endregion + + #region Act + var subscription = await contractConnection.SubscribeAsync((msg) => recievedMessages.Add(msg), (err) => { }, ChannelName); + var result = await contractConnection.PublishAsync(testMessage, ChannelName); + #endregion + + #region Assert + Assert.IsTrue(await Helper.WaitForCount(recievedMessages, 1, TimeSpan.FromMinutes(1))); + + await subscription.EndAsync(); + + Assert.IsNotNull(result); + Assert.AreEqual(transmissionResult, result); + Assert.AreEqual(testMessage, recievedMessages[0].Message); + Assert.IsTrue(Enumerable.SequenceEqual(BitConverter.GetBytes(testMessage), serviceMessages[0].Data.ToArray())); + #endregion + + #region Verify + serviceConnection.Verify(x => x.PublishAsync(It.IsAny(), It.IsAny()), Times.Once); + serviceConnection.Verify(x => x.SubscribeAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + #endregion + } + + [TestMethod] + public async Task TestByteEncoder() + { + #region Arrange + var transmissionResult = new TransmissionResult(Guid.NewGuid().ToString()); + var serviceSubscription = new Mock(); + + var testMessage = RandomNumberGenerator.GetBytes(1)[0]; + + List serviceMessages = []; + var actions = new List>(); + var recievedMessages = new List>(); + + var serviceConnection = new Mock(); + serviceConnection.Setup(x => x.SubscribeAsync(Capture.In(actions), It.IsAny>(), It.IsAny(), + It.IsAny(), It.IsAny())) + .ReturnsAsync(serviceSubscription.Object); + serviceConnection.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny())) + .Returns((ServiceMessage message, CancellationToken cancellationToken) => + { + var rmessage = Helper.ProduceReceivedServiceMessage(message); + serviceMessages.Add(rmessage); + foreach (var act in actions) + act(rmessage); + return ValueTask.FromResult(transmissionResult); + }); + + var contractConnection = ContractConnection.Instance(serviceConnection.Object); + #endregion + + #region Act + var subscription = await contractConnection.SubscribeAsync((msg) => recievedMessages.Add(msg), (err) => { }, ChannelName); + var result = await contractConnection.PublishAsync(testMessage, ChannelName); + #endregion + + #region Assert + Assert.IsTrue(await Helper.WaitForCount(recievedMessages, 1, TimeSpan.FromMinutes(1))); + + await subscription.EndAsync(); + + Assert.IsNotNull(result); + Assert.AreEqual(transmissionResult, result); + Assert.AreEqual(testMessage, recievedMessages[0].Message); + Assert.IsTrue(Enumerable.SequenceEqual([testMessage], serviceMessages[0].Data.ToArray())); + #endregion + + #region Verify + serviceConnection.Verify(x => x.PublishAsync(It.IsAny(), It.IsAny()), Times.Once); + serviceConnection.Verify(x => x.SubscribeAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + #endregion + } + + private async Task BitConverterTypeTest(T testMessage, byte[] convertedValue) + { + #region Arrange + var transmissionResult = new TransmissionResult(Guid.NewGuid().ToString()); + var serviceSubscription = new Mock(); + + List serviceMessages = []; + var actions = new List>(); + var recievedMessages = new List>(); + + var serviceConnection = new Mock(); + serviceConnection.Setup(x => x.SubscribeAsync(Capture.In(actions), It.IsAny>(), It.IsAny(), + It.IsAny(), It.IsAny())) + .ReturnsAsync(serviceSubscription.Object); + serviceConnection.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny())) + .Returns((ServiceMessage message, CancellationToken cancellationToken) => + { + var rmessage = Helper.ProduceReceivedServiceMessage(message); + serviceMessages.Add(rmessage); + foreach (var act in actions) + act(rmessage); + return ValueTask.FromResult(transmissionResult); + }); + + var contractConnection = ContractConnection.Instance(serviceConnection.Object); + #endregion + + #region Act + var subscription = await contractConnection.SubscribeAsync((msg) => recievedMessages.Add(msg), (err) => { }, ChannelName); + var result = await contractConnection.PublishAsync(testMessage, ChannelName); + #endregion + + #region Assert + Assert.IsTrue(await Helper.WaitForCount(recievedMessages, 1, TimeSpan.FromMinutes(1))); + + await subscription.EndAsync(); + + Assert.IsNotNull(result); + Assert.AreEqual(transmissionResult, result); + Assert.AreEqual(testMessage, recievedMessages[0].Message); + Assert.IsTrue(Enumerable.SequenceEqual(convertedValue, serviceMessages[0].Data.ToArray())); + #endregion + + #region Verify + serviceConnection.Verify(x => x.PublishAsync(It.IsAny(), It.IsAny()), Times.Once); + serviceConnection.Verify(x => x.SubscribeAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + #endregion + } + + [TestMethod] + public async Task TestCharEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(char)); + await BitConverterTypeTest(BitConverter.ToChar(binaryData), binaryData); + } + + [TestMethod] + public async Task TestDoubleEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(double)); + await BitConverterTypeTest(BitConverter.ToDouble(binaryData), binaryData); + } + + [TestMethod] + public async Task TestFloatEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(float)); + await BitConverterTypeTest(BitConverter.ToSingle(binaryData), binaryData); + } + + [TestMethod] + public async Task TestHalfEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(2); + await BitConverterTypeTest(BitConverter.ToHalf(binaryData), binaryData); + } + + [TestMethod] + public async Task TestIntEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(int)); + await BitConverterTypeTest(BitConverter.ToInt32(binaryData), binaryData); + } + + [TestMethod] + public async Task TestLongEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(long)); + await BitConverterTypeTest(BitConverter.ToInt64(binaryData), binaryData); + } + + [TestMethod] + public async Task TestShortEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(short)); + await BitConverterTypeTest(BitConverter.ToInt16(binaryData), binaryData); + } + + [TestMethod] + public async Task TestUIntEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(uint)); + await BitConverterTypeTest(BitConverter.ToUInt32(binaryData), binaryData); + } + + [TestMethod] + public async Task TestULongEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(ulong)); + await BitConverterTypeTest(BitConverter.ToUInt64(binaryData), binaryData); + } + + [TestMethod] + public async Task TestUShortEncoder() + { + var binaryData = RandomNumberGenerator.GetBytes(sizeof(ushort)); + await BitConverterTypeTest(BitConverter.ToUInt16(binaryData), binaryData); + } + + [TestMethod] + public async Task TestDecimalEncoder() + { + var value = new decimal( + BitConverter.ToInt32(RandomNumberGenerator.GetBytes(sizeof(int))), + BitConverter.ToInt32(RandomNumberGenerator.GetBytes(sizeof(int))), + BitConverter.ToInt32(RandomNumberGenerator.GetBytes(sizeof(int))), + RandomNumberGenerator.GetBytes(1)[0]<(byte.MaxValue/2), + (byte)(RandomNumberGenerator.GetBytes(1)[0]%28) + ); + var bits = decimal.GetBits(value); + var binaryData = new byte[sizeof(int)*bits.Length]; + for(var i=0;i(value, binaryData); + } + } +} diff --git a/Connectors/ActiveMQ/ActiveMQ.csproj b/Connectors/ActiveMQ/ActiveMQ.csproj index 1bab3fd..f42b317 100644 --- a/Connectors/ActiveMQ/ActiveMQ.csproj +++ b/Connectors/ActiveMQ/ActiveMQ.csproj @@ -12,10 +12,6 @@ - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/Connectors/ApachePulsar/ApachePulsar.csproj b/Connectors/ApachePulsar/ApachePulsar.csproj index 672ca4e..8c9a419 100644 --- a/Connectors/ApachePulsar/ApachePulsar.csproj +++ b/Connectors/ApachePulsar/ApachePulsar.csproj @@ -12,11 +12,7 @@ - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Connectors/ApachePulsar/Subscription.cs b/Connectors/ApachePulsar/Subscription.cs index 215d3d7..a54f496 100644 --- a/Connectors/ApachePulsar/Subscription.cs +++ b/Connectors/ApachePulsar/Subscription.cs @@ -1,6 +1,5 @@ using DotPulsar; using DotPulsar.Abstractions; -using DotPulsar.Extensions; using MQContract.Interfaces.Service; using MQContract.Messages; diff --git a/Connectors/AzureServiceBus/AzureServiceBus.csproj b/Connectors/AzureServiceBus/AzureServiceBus.csproj index 28ee9de..9d0c045 100644 --- a/Connectors/AzureServiceBus/AzureServiceBus.csproj +++ b/Connectors/AzureServiceBus/AzureServiceBus.csproj @@ -11,11 +11,7 @@ - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Connectors/GooglePubSub/Connection.cs b/Connectors/GooglePubSub/Connection.cs index 5190571..2a6d12e 100644 --- a/Connectors/GooglePubSub/Connection.cs +++ b/Connectors/GooglePubSub/Connection.cs @@ -2,7 +2,6 @@ using Google.Protobuf; using MQContract.Interfaces.Service; using MQContract.Messages; -using static Google.Cloud.PubSub.V1.PublisherClient; namespace MQContract.GooglePubSub { diff --git a/Connectors/GooglePubSub/GooglePubSub.csproj b/Connectors/GooglePubSub/GooglePubSub.csproj index b61f6c6..a990271 100644 --- a/Connectors/GooglePubSub/GooglePubSub.csproj +++ b/Connectors/GooglePubSub/GooglePubSub.csproj @@ -11,11 +11,7 @@ - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Connectors/GooglePubSub/Subscription.cs b/Connectors/GooglePubSub/Subscription.cs index 658c3f6..3cfbd6e 100644 --- a/Connectors/GooglePubSub/Subscription.cs +++ b/Connectors/GooglePubSub/Subscription.cs @@ -1,7 +1,6 @@ using Google.Cloud.PubSub.V1; using MQContract.Interfaces.Service; using MQContract.Messages; -using static Google.Cloud.PubSub.V1.SubscriberClient; namespace MQContract.GooglePubSub diff --git a/Connectors/HiveMQ/HiveMQ.csproj b/Connectors/HiveMQ/HiveMQ.csproj index d865173..6a3b742 100644 --- a/Connectors/HiveMQ/HiveMQ.csproj +++ b/Connectors/HiveMQ/HiveMQ.csproj @@ -14,11 +14,7 @@ - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Connectors/InMemory/InMemory.csproj b/Connectors/InMemory/InMemory.csproj index 2278a19..eb25d36 100644 --- a/Connectors/InMemory/InMemory.csproj +++ b/Connectors/InMemory/InMemory.csproj @@ -15,13 +15,6 @@ - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - diff --git a/Connectors/Kafka/Kafka.csproj b/Connectors/Kafka/Kafka.csproj index bac7ff6..fcf85ed 100644 --- a/Connectors/Kafka/Kafka.csproj +++ b/Connectors/Kafka/Kafka.csproj @@ -12,11 +12,7 @@ - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - + diff --git a/Connectors/KubeMQ/KubeMQ.csproj b/Connectors/KubeMQ/KubeMQ.csproj index 5abca5f..ac6c9be 100644 --- a/Connectors/KubeMQ/KubeMQ.csproj +++ b/Connectors/KubeMQ/KubeMQ.csproj @@ -12,13 +12,9 @@ - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Connectors/NATS/NATS.csproj b/Connectors/NATS/NATS.csproj index a1de10f..cb79096 100644 --- a/Connectors/NATS/NATS.csproj +++ b/Connectors/NATS/NATS.csproj @@ -12,11 +12,7 @@ - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - + diff --git a/Connectors/RabbitMQ/RabbitMQ.csproj b/Connectors/RabbitMQ/RabbitMQ.csproj index 2ccbdb5..d5e1d91 100644 --- a/Connectors/RabbitMQ/RabbitMQ.csproj +++ b/Connectors/RabbitMQ/RabbitMQ.csproj @@ -12,10 +12,6 @@ - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/Connectors/Redis/Redis.csproj b/Connectors/Redis/Redis.csproj index a5f70a6..e67e7ec 100644 --- a/Connectors/Redis/Redis.csproj +++ b/Connectors/Redis/Redis.csproj @@ -12,11 +12,7 @@ - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Core/Connections/AConnection.cs b/Core/Connections/AConnection.cs index 98ea283..cf1f9fe 100644 --- a/Core/Connections/AConnection.cs +++ b/Core/Connections/AConnection.cs @@ -35,7 +35,7 @@ internal abstract class AConnection(IMessageEncoder? defaultMessageEncoder = protected ILogger? Logger => logger; protected IDisposable? SetScope(string? messageID=null) => logger?.BeginScope($"Connection[{indentifier}]{(messageID==null ? "" : $"|Message[{messageID}]")}"); - protected IMessageFactory GetMessageFactory(uint? maxMessageBodySize,bool ignoreMessageHeader = false) where T : class + protected IMessageFactory GetMessageFactory(uint? maxMessageBodySize,bool ignoreMessageHeader = false) { using var scope = SetScope(); logger?.LogInformation("Obtaining message factory for {Type}", typeof(T)); @@ -85,7 +85,6 @@ CC IMetricContractConnection.RegisterMiddleware(Func constructInsta => RegisterMiddleware(constructInstance()); private async ValueTask<(T message, string? channel, MessageHeader messageHeader)> BeforeMessageEncodeAsync(IContext context, T message, string? channel, MessageHeader messageHeader) - where T : class { using var scope = SetScope(); logger?.LogInformation("Executing Before Message Encode middleware for message of type {Type}", typeof(T)); @@ -136,7 +135,6 @@ private async ValueTask AfterMessageEncodeAsync(IContext cont } private async ValueTask<(T message, MessageHeader messageHeader)> AfterMessageDecodeAsync(IContext context, T message, string ID, MessageHeader messageHeader, DateTime receivedTimestamp, DateTime processedTimeStamp) - where T : class { using var scope = SetScope(ID); logger?.LogInformation("Executing After Message Decode middleware for message of type {Type}", typeof(T)); @@ -157,7 +155,6 @@ private async ValueTask AfterMessageEncodeAsync(IContext cont } protected async ValueTask ProduceServiceMessageAsync(ChannelMapper.MapTypes mapType, IMessageFactory messageFactory, T message, bool ignoreChannel, string? channel = null, MessageHeader? messageHeader = null) - where T : class { using var scope = SetScope(); logger?.LogDebug("Producing Service Message for message of type {Type}", typeof(T)); @@ -168,7 +165,6 @@ await messageFactory.ConvertMessageAsync(message, ignoreChannel, channel, messag ); } protected async ValueTask<(T message, MessageHeader header)> DecodeServiceMessageAsync(ChannelMapper.MapTypes mapType, IMessageFactory messageFactory, ReceivedServiceMessage message) - where T : class { using var scope = SetScope(message.ID); logger?.LogDebug("Decoding Service Message message of type {Type}", typeof(T)); @@ -216,17 +212,16 @@ private MetricsMiddleware? MetricsMiddleware #endregion #region Subscriptions - protected abstract ValueTask CreateSubscriptionAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken) - where T : class; + protected abstract ValueTask CreateSubscriptionAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken); - ValueTask IBaseContractConnection.SubscribeAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, CancellationToken cancellationToken) where T : class + ValueTask IBaseContractConnection.SubscribeAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, CancellationToken cancellationToken) { using var scope = SetScope(); logger?.LogDebug("Creating PubSub subscription for message type {T} on channel {Channel} in group {Group}", typeof(T), channel, group); return CreateSubscriptionAsync(messageReceived, errorReceived, channel, group, ignoreMessageHeader, false, cancellationToken); } - ValueTask IBaseContractConnection.SubscribeAsync(Action> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, CancellationToken cancellationToken) where T : class + ValueTask IBaseContractConnection.SubscribeAsync(Action> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, CancellationToken cancellationToken) { using var scope = SetScope(); logger?.LogDebug("Creating PubSub subscription for message type {T} on channel {Channel} in group {Group}", typeof(T), channel, group); @@ -238,9 +233,7 @@ ValueTask IBaseContractConnection.SubscribeAsync(Action ProduceSubscribeQueryResponseAsync(Func, ValueTask>> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken) - where Q : class - where R : class; + protected abstract ValueTask ProduceSubscribeQueryResponseAsync(Func, ValueTask>> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken); ValueTask IBaseContractConnection.SubscribeQueryAsyncResponseAsync(Func, ValueTask>> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, CancellationToken cancellationToken) { @@ -282,7 +275,6 @@ protected async ValueTask> BulkPublishAsync(IEnu #pragma warning disable S4136 // Method overloads should be grouped together protected async ValueTask CreateSubscriptionAsync(IMessageFactory messageFactory, IMessageServiceConnection serviceConnection, Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool synchronous, CancellationToken cancellationToken) #pragma warning restore S4136 // Method overloads should be grouped together - where T : class { using var scope = SetScope(); logger?.LogDebug("Creating PubSub Subscription for {T} on {Channel} in {Group}.", typeof(T), channel, group); @@ -379,7 +371,7 @@ private async ValueTask ProcessInboxMessageAsync(string conn } return tcs.Task.Result; } - protected async ValueTask> ProduceResultAsync(uint? maxMessageBodySize,ServiceQueryResult queryResult) where R : class + protected async ValueTask> ProduceResultAsync(uint? maxMessageBodySize,ServiceQueryResult queryResult) { using var scope = SetScope(queryResult.ID); logger?.LogDebug("Attempting to produce a Query Result of {R} from the Service Message of the type {MessageTypeID}", typeof(R), queryResult.MessageTypeID); @@ -417,8 +409,6 @@ protected async ValueTask> ProduceResultAsync(uint? maxMessage } protected async ValueTask> ExecuteQueryAsync(IMessageServiceConnection serviceConnection, ServiceMessage serviceMessage, TimeSpan? timeout = null, string? responseChannel = null,string connectionName = "DEFAULT", CancellationToken cancellationToken = new CancellationToken()) - where Q : class - where R : class { using var scope = SetScope(serviceMessage.ID); logger?.LogDebug("Attempting to execute a Query of {Q} with a response {R}", typeof(Q), typeof(R)); @@ -448,8 +438,6 @@ await ProcessInboxMessageAsync(connectionName, inboxMessageServiceConnection, se } protected async ValueTask> ProcessPubSubQuery(IMessageServiceConnection serviceConnection, string? responseChannel, TimeSpan? realTimeout, ServiceMessage serviceMessage, CancellationToken cancellationToken) - where Q : class - where R : class { using var scope = SetScope(); responseChannel ??=typeof(Q).GetCustomAttribute()?.Name; @@ -491,8 +479,6 @@ protected async ValueTask> ProcessPubSubQuery(IMessageServi } protected async ValueTask CreateSubscriptionAsync(IMessageFactory queryMessageFactory,IMessageFactory responseMessageFactory, IMessageServiceConnection serviceConnection, Func, ValueTask>> messageReceived, Action errorReceived, string? channel, string? group, bool synchronous, CancellationToken cancellationToken) - where Q : class - where R : class { using var scope = SetScope(); logger?.LogDebug("Creating QueryResponse subscription for {Q} answering with {R} on {Channel} in {Group}", typeof(Q), typeof(R), channel, group); diff --git a/Core/Connections/AMappableConnection.cs b/Core/Connections/AMappableConnection.cs index 3b121dd..55424aa 100644 --- a/Core/Connections/AMappableConnection.cs +++ b/Core/Connections/AMappableConnection.cs @@ -56,7 +56,6 @@ CC IMappableContractConnection.RegisterServiceConnection(string messageHeade } protected async ValueTask<(IEnumerable connections,string channel)> GetConnectionsAsync(string? channel, ChannelMapper.MapTypes mapTypes) - where T : class { channel = await Utility.GetChannelAsync((originalChannel) => MapChannel(mapTypes, originalChannel), channel); return (await GetConnectionsAsync(channel, typeof(T), new MessageHeader([])),channel); diff --git a/Core/Connections/Connection.cs b/Core/Connections/Connection.cs index d3643ea..bba32b5 100644 --- a/Core/Connections/Connection.cs +++ b/Core/Connections/Connection.cs @@ -46,7 +46,6 @@ protected override async ValueTask InternalDisposeAsync() #region PubSub protected override ValueTask CreateSubscriptionAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken) - where T : class => CreateSubscriptionAsync( GetMessageFactory(serviceConnection.MaxMessageBodySize, ignoreMessageHeader), serviceConnection, @@ -88,8 +87,6 @@ async ValueTask> IContractConnection.BulkPublish #region QueryResponse private async ValueTask> ProcessQueryAsync(Q message, TimeSpan? timeout, string? channel, string? responseChannel, MessageHeader? messageHeader, CancellationToken cancellationToken) - where Q : class - where R : class { using var scope = SetScope(); Logger?.LogDebug("Executing QueryResponse of {Q}, expecting {R} on {Channel} with {ResponseChannel}", typeof(Q),typeof(R),channel,responseChannel); @@ -134,8 +131,6 @@ async ValueTask> IContractConnection.QueryAsync(Q message } protected override async ValueTask ProduceSubscribeQueryResponseAsync(Func, ValueTask>> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken) - where Q : class - where R : class { using var scope = SetScope(); Logger?.LogDebug("Producing QueryResponse Subscription for {Q} responding with {R} on {Channel} in {Group}", typeof(Q), typeof(R), channel, group); diff --git a/Core/Connections/MappedConnection.cs b/Core/Connections/MappedConnection.cs index e1b400a..87bf2d1 100644 --- a/Core/Connections/MappedConnection.cs +++ b/Core/Connections/MappedConnection.cs @@ -57,7 +57,6 @@ protected override async ValueTask InternalDisposeAsync() } private new async ValueTask<(ServiceConnectionList.ServiceConnection connections, string channel)> GetConnectionsAsync(string? channel, ChannelMapper.MapTypes mapTypes) - where T : class { using var scope = SetScope(); Logger?.LogDebug("Locating a connection for {Channel}, {T} and {MapType}", channel, typeof(T), mapTypes); @@ -72,7 +71,6 @@ protected override async ValueTask InternalDisposeAsync() #region PubSub protected override async ValueTask CreateSubscriptionAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken) - where T : class { (var connection, channel) = await GetConnectionsAsync(channel, ChannelMapper.MapTypes.PublishSubscription); return await CreateSubscriptionAsync( @@ -120,8 +118,6 @@ async ValueTask> IContractConnection.BulkPublish #region QueryResponse private async ValueTask> ProcessQueryAsync(Q message, TimeSpan? timeout, string? channel, string? responseChannel, MessageHeader? messageHeader, CancellationToken cancellationToken) - where Q : class - where R : class { using var scope = SetScope(); Logger?.LogDebug("Executing QueryResponse of {Q}, expecting {R} on {Channel} with {ResponseChannel}", typeof(Q), typeof(R), channel, responseChannel); @@ -167,8 +163,6 @@ async ValueTask> IContractConnection.QueryAsync(Q message } protected override async ValueTask ProduceSubscribeQueryResponseAsync(Func, ValueTask>> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken) - where Q : class - where R : class { using var scope = SetScope(); Logger?.LogDebug("Producing QueryResponse Subscription for {Q} responding with {R} on {Channel} in {Group}", typeof(Q), typeof(R), channel, group); diff --git a/Core/Connections/MultiServiceConnection.cs b/Core/Connections/MultiServiceConnection.cs index 2dd9905..121768d 100644 --- a/Core/Connections/MultiServiceConnection.cs +++ b/Core/Connections/MultiServiceConnection.cs @@ -84,7 +84,6 @@ async ValueTask> IMultiServiceContractConne } protected override async ValueTask CreateSubscriptionAsync(Func, ValueTask> messageReceived, Action errorReceived, string? channel, string? group, bool ignoreMessageHeader, bool synchronous, CancellationToken cancellationToken) - where T : class { var messageFactory = GetMessageFactory(MaxMessageBodySize, ignoreMessageHeader); (var connections,channel) = await GetConnectionsAsync(channel, ChannelMapper.MapTypes.PublishSubscription); @@ -105,8 +104,6 @@ protected override async ValueTask CreateSubscriptionAsync(Fun #region QueryResponse private async ValueTask>> ProcessQueryAsync(Q message, TimeSpan? timeout, string? channel, string? responseChannel, MessageHeader? messageHeader, CancellationToken cancellationToken) - where Q : class - where R : class { using var scope = SetScope(); Logger?.LogDebug("Executing QueryResponse of {Q}, expecting {R} on {Channel} with {ResponseChannel}", typeof(Q), typeof(R), channel, responseChannel); diff --git a/Core/Core.csproj b/Core/Core.csproj index f42b6cc..00d8f11 100644 --- a/Core/Core.csproj +++ b/Core/Core.csproj @@ -13,21 +13,10 @@ - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + - - - - True - \ - - diff --git a/Core/Defaults/BitConverterHelper.cs b/Core/Defaults/BitConverterHelper.cs new file mode 100644 index 0000000..0bddf57 --- /dev/null +++ b/Core/Defaults/BitConverterHelper.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal static class BitConverterHelper + { + public static async ValueTask StreamToByteArray(Stream stream) + { + using var bufferedStream = new BufferedStream(stream); + using var memoryStream = new MemoryStream(); + await bufferedStream.CopyToAsync(memoryStream); + return memoryStream.ToArray(); + } + } +} diff --git a/Core/Defaults/BooleanEncoder.cs b/Core/Defaults/BooleanEncoder.cs new file mode 100644 index 0000000..a6eb89c --- /dev/null +++ b/Core/Defaults/BooleanEncoder.cs @@ -0,0 +1,19 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class BooleanEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToBoolean(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(bool message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/ByteArrayEncoder.cs b/Core/Defaults/ByteArrayEncoder.cs new file mode 100644 index 0000000..97537aa --- /dev/null +++ b/Core/Defaults/ByteArrayEncoder.cs @@ -0,0 +1,13 @@ +using MQContract.Interfaces.Encoding; + +namespace MQContract.Defaults +{ + internal class ByteArrayEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => await BitConverterHelper.StreamToByteArray(stream); + + ValueTask IMessageTypeEncoder.EncodeAsync(byte[] message) + => ValueTask.FromResult(message); + } +} diff --git a/Core/Defaults/ByteEncoder.cs b/Core/Defaults/ByteEncoder.cs new file mode 100644 index 0000000..db8b8c3 --- /dev/null +++ b/Core/Defaults/ByteEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class ByteEncoder : IMessageTypeEncoder + { + ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => ValueTask.FromResult((byte)stream.ReadByte()); + + ValueTask IMessageTypeEncoder.EncodeAsync(byte message) + => ValueTask.FromResult([message]); + } +} diff --git a/Core/Defaults/CharEncoder.cs b/Core/Defaults/CharEncoder.cs new file mode 100644 index 0000000..8b854d4 --- /dev/null +++ b/Core/Defaults/CharEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class CharEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToChar(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(char message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/DecimalEncoder.cs b/Core/Defaults/DecimalEncoder.cs new file mode 100644 index 0000000..d991efc --- /dev/null +++ b/Core/Defaults/DecimalEncoder.cs @@ -0,0 +1,37 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class DecimalEncoder : IMessageTypeEncoder + { + private const int BitsPerDecimal = 4; + + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + { + var byteData = await BitConverterHelper.StreamToByteArray(stream); + + var bits = new int[BitsPerDecimal]; + for (var i = 0; i IMessageTypeEncoder.EncodeAsync(decimal message) + { + var result = new byte[sizeof(int)*BitsPerDecimal]; + + var bits = decimal.GetBits(message); + + for (var i = 0; i + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToDouble(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(double message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/FloatEncoder.cs b/Core/Defaults/FloatEncoder.cs new file mode 100644 index 0000000..f4a636a --- /dev/null +++ b/Core/Defaults/FloatEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class FloatEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToSingle(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(float message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/HalfEncoder.cs b/Core/Defaults/HalfEncoder.cs new file mode 100644 index 0000000..fa32b0b --- /dev/null +++ b/Core/Defaults/HalfEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class HalfEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToHalf(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(Half message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/IntEncoder.cs b/Core/Defaults/IntEncoder.cs new file mode 100644 index 0000000..a232eb3 --- /dev/null +++ b/Core/Defaults/IntEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class IntEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToInt32(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(int message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/LongEncoder.cs b/Core/Defaults/LongEncoder.cs new file mode 100644 index 0000000..b85e31e --- /dev/null +++ b/Core/Defaults/LongEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class ULongEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToUInt64(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(ulong message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/ShortEncoder.cs b/Core/Defaults/ShortEncoder.cs new file mode 100644 index 0000000..6230312 --- /dev/null +++ b/Core/Defaults/ShortEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class ShortEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToInt16(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(short message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/UIntEncoder.cs b/Core/Defaults/UIntEncoder.cs new file mode 100644 index 0000000..12f4a7c --- /dev/null +++ b/Core/Defaults/UIntEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class UIntEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToUInt32(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(uint message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/ULongEncoder.cs b/Core/Defaults/ULongEncoder.cs new file mode 100644 index 0000000..7750f81 --- /dev/null +++ b/Core/Defaults/ULongEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class LongEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToInt64(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(long message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Defaults/UShortEncoder.cs b/Core/Defaults/UShortEncoder.cs new file mode 100644 index 0000000..f54dc0b --- /dev/null +++ b/Core/Defaults/UShortEncoder.cs @@ -0,0 +1,18 @@ +using MQContract.Interfaces.Encoding; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MQContract.Defaults +{ + internal class UShortEncoder : IMessageTypeEncoder + { + async ValueTask IMessageTypeEncoder.DecodeAsync(Stream stream) + => BitConverter.ToUInt16(await BitConverterHelper.StreamToByteArray(stream)); + + ValueTask IMessageTypeEncoder.EncodeAsync(ushort message) + => ValueTask.FromResult(BitConverter.GetBytes(message)); + } +} diff --git a/Core/Factories/MessageTypeFactory.cs b/Core/Factories/MessageTypeFactory.cs index da9819f..021ec06 100644 --- a/Core/Factories/MessageTypeFactory.cs +++ b/Core/Factories/MessageTypeFactory.cs @@ -16,7 +16,7 @@ namespace MQContract.Factories { internal class MessageTypeFactory - : IMessageFactory where T : class + : IMessageFactory { private static Regex RegMetaData => new(@"^(U|C)-(.+)-((\d+\.)*(\d+))$", RegexOptions.Compiled, TimeSpan.FromMilliseconds(200)); @@ -79,7 +79,6 @@ public MessageTypeFactory(IMessageEncoder? globalMessageEncoder, IMessageEncrypt } private static IEnumerable> ProduceConverters(IEnumerable types, IMessageEncoder? globalMessageEncoder, IMessageEncryptor? globalMessageEncryptor, IServiceProvider? serviceProvider) - where M : class { var paths = types .Where(t => Array.Exists(t.GetInterfaces(), iface => iface.IsGenericType && diff --git a/Core/Interfaces/Conversion/IConversionPath.cs b/Core/Interfaces/Conversion/IConversionPath.cs index ead9f2e..a0ddd1c 100644 --- a/Core/Interfaces/Conversion/IConversionPath.cs +++ b/Core/Interfaces/Conversion/IConversionPath.cs @@ -4,7 +4,6 @@ namespace MQContract.Interfaces.Conversion { internal interface IConversionPath - where T : class { ValueTask ConvertMessageAsync(ILogger? logger, IEncodedMessage message, Stream? dataStream = null); } diff --git a/Core/Interfaces/Factories/IMessageFactory.cs b/Core/Interfaces/Factories/IMessageFactory.cs index b3cc59b..d010534 100644 --- a/Core/Interfaces/Factories/IMessageFactory.cs +++ b/Core/Interfaces/Factories/IMessageFactory.cs @@ -3,7 +3,7 @@ namespace MQContract.Interfaces.Factories { - internal interface IMessageFactory : IMessageTypeFactory, IConversionPath where T : class + internal interface IMessageFactory : IMessageTypeFactory, IConversionPath { string? MessageChannel { get; } ValueTask ConvertMessageAsync(T message,bool ignoreChannel, string? channel, MessageHeader messageHeader); diff --git a/Core/Messages/RecievedMessage.cs b/Core/Messages/RecievedMessage.cs index 6c779bf..e274197 100644 --- a/Core/Messages/RecievedMessage.cs +++ b/Core/Messages/RecievedMessage.cs @@ -4,6 +4,5 @@ namespace MQContract.Messages { internal record ReceivedMessage(string ID,T Message,MessageHeader Headers,DateTime ReceivedTimestamp,DateTime ProcessedTimestamp) : IReceivedMessage - where T : class {} } diff --git a/Core/Subscriptions/PubSubSubscription.cs b/Core/Subscriptions/PubSubSubscription.cs index ef30d7a..4ccdad2 100644 --- a/Core/Subscriptions/PubSubSubscription.cs +++ b/Core/Subscriptions/PubSubSubscription.cs @@ -8,7 +8,6 @@ internal sealed class PubSubSubscription(Func> mapChannel, string? channel = null, string? group = null, bool synchronous=false,ILogger? logger=null) : SubscriptionBase(mapChannel,channel,synchronous,logger) - where T : class { public async ValueTask EstablishSubscriptionAsync(IMessageServiceConnection connection,CancellationToken cancellationToken) { diff --git a/Core/Subscriptions/QueryResponseSubscription.cs b/Core/Subscriptions/QueryResponseSubscription.cs index 8932fca..f5023c8 100644 --- a/Core/Subscriptions/QueryResponseSubscription.cs +++ b/Core/Subscriptions/QueryResponseSubscription.cs @@ -11,7 +11,6 @@ internal sealed class QueryResponseSubscription( string? channel = null, string? group = null, bool synchronous = false, ILogger? logger = null) : SubscriptionBase(mapChannel, channel, synchronous, logger) - where T : class { private ManualResetEventSlim? manualResetEvent = new(true); private CancellationTokenSource? token = new(); diff --git a/Core/Subscriptions/SubscriptionBase.cs b/Core/Subscriptions/SubscriptionBase.cs index c0643c3..3657971 100644 --- a/Core/Subscriptions/SubscriptionBase.cs +++ b/Core/Subscriptions/SubscriptionBase.cs @@ -6,7 +6,6 @@ namespace MQContract.Subscriptions { internal abstract class SubscriptionBase(Func> mapChannel, string? channel, bool synchronous,ILogger? logger) : ISubscription - where T : class { protected IServiceSubscription? serviceSubscription; private bool disposedValue; diff --git a/Core/Utility.cs b/Core/Utility.cs index 2470d97..b7afc20 100644 --- a/Core/Utility.cs +++ b/Core/Utility.cs @@ -37,11 +37,9 @@ internal static string MessageVersionString(Type messageType) } internal async static ValueTask GetChannelAsync(Func> mapChannel, string? channel = null) - where T : class => await mapChannel(channel??typeof(T).GetCustomAttribute(false)?.Name??throw new MessageChannelNullException()); internal static string GetChannel(Func> mapChannel, string? channel = null) - where T : class { var chan = channel??typeof(T).GetCustomAttribute(false)?.Name??throw new MessageChannelNullException(); var tsk = mapChannel(chan).AsTask(); diff --git a/Samples/ApachePulsarSample/Program.cs b/Samples/ApachePulsarSample/Program.cs index bfa5799..373d604 100644 --- a/Samples/ApachePulsarSample/Program.cs +++ b/Samples/ApachePulsarSample/Program.cs @@ -1,6 +1,5 @@ using DotPulsar; using Messages; -using MQContract; using MQContract.ApachePulsar; var builder = PulsarClient.Builder() diff --git a/Shared.props b/Shared.props index f8c838f..c5d0704 100644 --- a/Shared.props +++ b/Shared.props @@ -1,6 +1,6 @@ - 2.0.1 + 2.1.0 https://github.com/roger-castaldo/MQContract Readme.md https://github.com/roger-castaldo/MQContract @@ -14,8 +14,21 @@ snupkg true $(MSBuildProjectDirectory)\Readme.md - True roger-castaldo $(AssemblyName) + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + True + \ + + \ No newline at end of file