diff --git a/src/BiDaFlow/Actors/Actor.cs b/src/BiDaFlow/Actors/Actor.cs index 023e079..4938388 100644 --- a/src/BiDaFlow/Actors/Actor.cs +++ b/src/BiDaFlow/Actors/Actor.cs @@ -52,16 +52,16 @@ protected Envelope CreateMessage(Action handler) }); } - protected EnvelopeWithReply CreateMessageWithReply(Func> handler, bool handleErrorByReceiver = false) + protected EnvelopeWithReply CreateMessageWithReply(Func> handler) { if (handler == null) throw new ArgumentNullException(nameof(handler)); - return new EnvelopeWithReply(this, handler, handleErrorByReceiver); + return new EnvelopeWithReply(this, handler); } - protected EnvelopeWithReply CreateMessageWithReply(Func handler, bool handleErrorByReceiver = false) + protected EnvelopeWithReply CreateMessageWithReply(Func handler) { if (handler == null) throw new ArgumentNullException(nameof(handler)); - return new EnvelopeWithReply(this, () => Task.FromResult(handler()), handleErrorByReceiver); + return new EnvelopeWithReply(this, () => Task.FromResult(handler())); } void IDataflowBlock.Complete() => this.Complete(); diff --git a/src/BiDaFlow/Actors/Envelope.cs b/src/BiDaFlow/Actors/Envelope.cs index ef6905b..918356a 100644 --- a/src/BiDaFlow/Actors/Envelope.cs +++ b/src/BiDaFlow/Actors/Envelope.cs @@ -36,18 +36,13 @@ public class EnvelopeWithReply { internal Actor Address { get; } internal Func> Action { get; } - internal bool HandleErrorByReceiver { get; } - internal EnvelopeWithReply(Actor address, Func> action, bool handleErrorByReceiver) + internal EnvelopeWithReply(Actor address, Func> action) { this.Address = address; this.Action = action; - this.HandleErrorByReceiver = handleErrorByReceiver; } - private const string DeclinedMessage = "The message was declined by the target block."; - private const string ActorCompletedMessage = "The message was enqueued to the actor but the actor has been completed."; - public Task PostAndReceiveReplyAsync() { var tcs = new TaskCompletionSource(); @@ -59,7 +54,7 @@ public Task PostAndReceiveReplyAsync() // Throw MessageNeverProcessedException when Address is completed this.Address.Completion.ContinueWith( (_, state) => ((TaskCompletionSource)state).TrySetException( - new MessageNeverProcessedException(ActorCompletedMessage)), + MessageNeverProcessedException.CreateCompleted()), tcs, cts.Token, TaskContinuationOptions.ExecuteSynchronously, @@ -68,7 +63,7 @@ public Task PostAndReceiveReplyAsync() } else { - tcs.TrySetException(new MessageNeverProcessedException(DeclinedMessage)); + tcs.TrySetException(MessageNeverProcessedException.CreateDeclined()); } return tcs.Task; @@ -94,7 +89,7 @@ public Task SendAndReceiveReplyAsync(CancellationToken cancellationToken { if (t.Result == false) { - tcs.TrySetException(new MessageNeverProcessedException(DeclinedMessage)); + tcs.TrySetException(MessageNeverProcessedException.CreateDeclined()); return; } } @@ -107,7 +102,7 @@ public Task SendAndReceiveReplyAsync(CancellationToken cancellationToken // Throw MessageNeverProcessedException when Address is completed this.Address.Completion.ContinueWith( (_, state) => ((TaskCompletionSource)state).TrySetException( - new MessageNeverProcessedException(ActorCompletedMessage)), + MessageNeverProcessedException.CreateCompleted()), tcs, cts.Token, TaskContinuationOptions.ExecuteSynchronously, diff --git a/src/BiDaFlow/Actors/MessageNeverProcessedException.cs b/src/BiDaFlow/Actors/MessageNeverProcessedException.cs index 225542e..82d501a 100644 --- a/src/BiDaFlow/Actors/MessageNeverProcessedException.cs +++ b/src/BiDaFlow/Actors/MessageNeverProcessedException.cs @@ -15,5 +15,15 @@ public MessageNeverProcessedException(string message) public MessageNeverProcessedException(string message, Exception innerException) : base(message, innerException) { } + + internal static MessageNeverProcessedException CreateDeclined() + { + return new MessageNeverProcessedException("The message was declined by the target block."); + } + + internal static MessageNeverProcessedException CreateCompleted() + { + return new MessageNeverProcessedException("The message was enqueued to the actor but the actor has been completed."); + } } } diff --git a/src/BiDaFlow/Actors/SupervisedBlockExtensions.cs b/src/BiDaFlow/Actors/SupervisedBlockExtensions.cs index db92a19..28a364c 100644 --- a/src/BiDaFlow/Actors/SupervisedBlockExtensions.cs +++ b/src/BiDaFlow/Actors/SupervisedBlockExtensions.cs @@ -135,7 +135,10 @@ public static bool Post(this SupervisedBlock supervisedActor, Fu var actor = actorOpt.Value; var envelope = createMessage(actor); - if (!ReferenceEquals(envelope?.Address, actor)) + if (envelope == null) + throw new InvalidOperationException("createMessage returned null."); + + if (!ReferenceEquals(envelope.Address, actor)) throw new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor."); return createMessage(actor).Post(); @@ -186,6 +189,9 @@ public static Task SendAsync( var actor = actorOpt.Value; var envelope = createMessage(actor); + if (envelope == null) + throw new InvalidOperationException("createMessage returned null."); + if (!ReferenceEquals(envelope.Address, actor)) { tcs.TrySetException(new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor.")); @@ -205,12 +211,15 @@ public static Task PostAndReceiveReplyAsync(this Supervi if (createMessage == null) throw new ArgumentNullException(nameof(createMessage)); var actorOpt = supervisedActor.CurrentBlock; - if (!actorOpt.HasValue) return Task.FromException(new MessageDeclinedException()); + if (!actorOpt.HasValue) return Task.FromException(MessageNeverProcessedException.CreateDeclined()); var actor = actorOpt.Value; var envelope = createMessage(actor); - if (!ReferenceEquals(envelope?.Address, actor)) + if (envelope == null) + throw new InvalidOperationException("createMessage returned null."); + + if (!ReferenceEquals(envelope.Address, actor)) throw new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor."); return createMessage(actor).PostAndReceiveReplyAsync(); @@ -226,7 +235,7 @@ public static Task SendAndReceiveReplyAsync( if (createMessage == null) throw new ArgumentNullException(nameof(createMessage)); if (supervisedActor.Completion.IsCompleted) - return Task.FromException(new MessageDeclinedException()); + return Task.FromException(MessageNeverProcessedException.CreateDeclined()); if (cancellationToken.IsCancellationRequested) return Task.FromCanceled(cancellationToken); @@ -248,7 +257,7 @@ public static Task SendAndReceiveReplyAsync( if (ex != null || completed) { - tcs.TrySetException(new MessageDeclinedException()); + tcs.TrySetException(MessageNeverProcessedException.CreateDeclined()); return; } @@ -261,6 +270,9 @@ public static Task SendAndReceiveReplyAsync( var actor = actorOpt.Value; var envelope = createMessage(actor); + if (envelope == null) + throw new InvalidOperationException("createMessage returned null."); + if (!ReferenceEquals(envelope.Address, actor)) { tcs.TrySetException(new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor.")); diff --git a/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs b/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs index 8530077..05d6b9f 100644 --- a/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs +++ b/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs @@ -427,7 +427,7 @@ private void CompleteCore(Exception? exception) else if (exception != null) exceptions.Add(exception); - this.ReleaseAllReservations(); + this.ReleasePostponedMessages(); } catch (Exception ex) { @@ -448,11 +448,12 @@ private void CompleteCore(Exception? exception) } } - private void ReleaseAllReservations() + private void ReleasePostponedMessages() { foreach (var message in this._offeringMessages) { - if (message.ReservedBy != null) + // https://github.com/dotnet/runtime/blob/89b8591928bcb9f90956c938fcd9fcfb2fdfb476/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/Common.cs#L508-L511 + if (message.ReservedBy != null || message.Source.ReserveMessage(message.SourceHeader, this)) { message.Source.ReleaseReservation(message.SourceHeader, this); } diff --git a/tests/BiDaFlow.Tests/Actors/ErrorHandlingTests.cs b/tests/BiDaFlow.Tests/Actors/ErrorHandlingTests.cs index cc484f6..4bc3cab 100644 --- a/tests/BiDaFlow.Tests/Actors/ErrorHandlingTests.cs +++ b/tests/BiDaFlow.Tests/Actors/ErrorHandlingTests.cs @@ -9,69 +9,36 @@ namespace BiDaFlow.Tests.Actors public partial class ErrorHandlingTests { [Fact] - public void TestThrowToSender() + public async Task TestThrowToSender() { var actor = new TestErrorHandlingActor(); - var aex = Assert - .Throws(() => actor.ThrowToSender() - .PostAndReceiveReplyAsync().Wait(TestUtils.CancelSometimeSoon())) - .Flatten(); + var ex = await Assert.ThrowsAsync(() => + actor.Throw().PostAndReceiveReplyAsync().CompleteSoon()); + ex.Message.Is("test1"); - aex.InnerExceptions.Count.Is(1); - aex.InnerException!.Message.Is("test1"); actor.Completion.IsCompleted.IsFalse(); } [Fact] - public void TestThrowToReceiver() + public async Task TestDiscardReply() { var actor = new TestErrorHandlingActor(); + actor.Throw().DiscardReply().Post().IsTrue(); - // If an exception is thrown to the receiver, the task should be canceled. - var aex = Assert.ThrowsAny(() => - actor.ThrowToReceiver().PostAndReceiveReplyAsync().Wait(TestUtils.CancelSometimeSoon())); - aex.InnerExceptions.Count.Is(1); - aex.InnerException!.IsInstanceOf(); - - aex = Assert - .Throws(() => actor.Completion.Wait(TestUtils.CancelSometimeSoon())) - .Flatten(); - aex.InnerExceptions.Count.Is(1); - aex.InnerException!.Message.Is("test2"); - } - - [Fact] - public void TestDiscardReply() - { - var actor = new TestErrorHandlingActor(); - actor.ThrowToSender().DiscardReply().Post().IsTrue(); - - // If the reply is discarded, the exception is thrown to the receiver even if handleErrorByReceiver is false. - var aex = Assert - .Throws(() => actor.Completion.Wait(TestUtils.CancelSometimeSoon())) - .Flatten(); - - aex.InnerExceptions.Count.Is(1); - aex.InnerException!.Message.Is("test1"); + // If the reply is discarded, the exception is thrown to the receiver. + var ex = await Assert.ThrowsAsync(() => actor.Completion.CompleteSoon()); + ex.Message.Is("test1"); } private class TestErrorHandlingActor : Actor { - public EnvelopeWithReply ThrowToSender() + public EnvelopeWithReply Throw() { return this.CreateMessageWithReply( new Func(() => throw new Exception("test1")) ); } - - public EnvelopeWithReply ThrowToReceiver() - { - return this.CreateMessageWithReply( - new Func(() => throw new Exception("test2")), - true - ); - } } } } diff --git a/tests/BiDaFlow.Tests/Blocks/BufferBlockTests.cs b/tests/BiDaFlow.Tests/Blocks/BufferBlockTests.cs index f0816b6..d8a6e01 100644 --- a/tests/BiDaFlow.Tests/Blocks/BufferBlockTests.cs +++ b/tests/BiDaFlow.Tests/Blocks/BufferBlockTests.cs @@ -61,5 +61,19 @@ public async Task TestMaxMessages() testBlock.Post(2).IsTrue(); testBlock.Post(3).IsFalse("Reach BoundedCapacity"); } + + [Fact] + public async Task TestCompleteAndCancelSending() + { + var testBlock = new BufferBlock(new DataflowBlockOptions() { BoundedCapacity = 1 }); + testBlock.Post(1).IsTrue(); + + var sendTask = testBlock.SendAsync(2); + await sendTask.NeverComplete(); + + testBlock.Complete(); + + (await sendTask.CompleteSoon()).IsFalse(); + } } } diff --git a/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs b/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs index adbc176..d9c856f 100644 --- a/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs +++ b/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs @@ -88,5 +88,18 @@ public async Task TestMaxMessages() transformBlock.Post(2).IsFalse(); } + + [Fact] + public async Task TestCompleteAndCancelSending() + { + var testBlock = new TransformWithoutBufferBlock(x => x); + + var sendTask = testBlock.SendAsync(1); + await sendTask.NeverComplete(); + + testBlock.Complete(); + + (await sendTask.CompleteSoon()).IsFalse(); + } } } diff --git a/tests/BiDaFlow.Tests/TestUtils.cs b/tests/BiDaFlow.Tests/TestUtils.cs index 2c63cdc..67b2d66 100644 --- a/tests/BiDaFlow.Tests/TestUtils.cs +++ b/tests/BiDaFlow.Tests/TestUtils.cs @@ -22,7 +22,15 @@ public static CancellationToken CancelSometimeSoon() public static async Task CompleteSoon(this Task task) { await Task.WhenAny(task, Task.Delay(SometimeSoon)).ConfigureAwait(false); - task.Status.Is(TaskStatus.RanToCompletion); + task.IsCompleted.IsTrue(); + await task; + } + + public static async Task CompleteSoon(this Task task) + { + await Task.WhenAny(task, Task.Delay(SometimeSoon)).ConfigureAwait(false); + task.IsCompleted.IsTrue(); + return await task; } public static async Task CanceledSoon(this Task task)