Skip to content

Commit

Permalink
test passing
Browse files Browse the repository at this point in the history
  • Loading branch information
azyobuzin committed Apr 28, 2020
1 parent e408edd commit 90416e1
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 66 deletions.
8 changes: 4 additions & 4 deletions src/BiDaFlow/Actors/Actor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ protected Envelope CreateMessage(Action handler)
});
}

protected EnvelopeWithReply<TReply> CreateMessageWithReply<TReply>(Func<Task<TReply>> handler, bool handleErrorByReceiver = false)
protected EnvelopeWithReply<TReply> CreateMessageWithReply<TReply>(Func<Task<TReply>> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));
return new EnvelopeWithReply<TReply>(this, handler, handleErrorByReceiver);
return new EnvelopeWithReply<TReply>(this, handler);
}

protected EnvelopeWithReply<TReply> CreateMessageWithReply<TReply>(Func<TReply> handler, bool handleErrorByReceiver = false)
protected EnvelopeWithReply<TReply> CreateMessageWithReply<TReply>(Func<TReply> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));
return new EnvelopeWithReply<TReply>(this, () => Task.FromResult(handler()), handleErrorByReceiver);
return new EnvelopeWithReply<TReply>(this, () => Task.FromResult(handler()));
}

void IDataflowBlock.Complete() => this.Complete();
Expand Down
15 changes: 5 additions & 10 deletions src/BiDaFlow/Actors/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,13 @@ public class EnvelopeWithReply<TReply>
{
internal Actor Address { get; }
internal Func<Task<TReply>> Action { get; }
internal bool HandleErrorByReceiver { get; }

internal EnvelopeWithReply(Actor address, Func<Task<TReply>> action, bool handleErrorByReceiver)
internal EnvelopeWithReply(Actor address, Func<Task<TReply>> action)
{
this.Address = address;
this.Action = action;
this.HandleErrorByReceiver = handleErrorByReceiver;
}

private const string DeclinedMessage = "The message was declined by the target block.";
private const string ActorCompletedMessage = "The message was enqueued to the actor but the actor has been completed.";

public Task<TReply> PostAndReceiveReplyAsync()
{
var tcs = new TaskCompletionSource<TReply>();
Expand All @@ -59,7 +54,7 @@ public Task<TReply> PostAndReceiveReplyAsync()
// Throw MessageNeverProcessedException when Address is completed
this.Address.Completion.ContinueWith(
(_, state) => ((TaskCompletionSource<TReply>)state).TrySetException(
new MessageNeverProcessedException(ActorCompletedMessage)),
MessageNeverProcessedException.CreateCompleted()),
tcs,
cts.Token,
TaskContinuationOptions.ExecuteSynchronously,
Expand All @@ -68,7 +63,7 @@ public Task<TReply> PostAndReceiveReplyAsync()
}
else
{
tcs.TrySetException(new MessageNeverProcessedException(DeclinedMessage));
tcs.TrySetException(MessageNeverProcessedException.CreateDeclined());
}

return tcs.Task;
Expand All @@ -94,7 +89,7 @@ public Task<TReply> SendAndReceiveReplyAsync(CancellationToken cancellationToken
{
if (t.Result == false)
{
tcs.TrySetException(new MessageNeverProcessedException(DeclinedMessage));
tcs.TrySetException(MessageNeverProcessedException.CreateDeclined());
return;
}
}
Expand All @@ -107,7 +102,7 @@ public Task<TReply> SendAndReceiveReplyAsync(CancellationToken cancellationToken
// Throw MessageNeverProcessedException when Address is completed
this.Address.Completion.ContinueWith(
(_, state) => ((TaskCompletionSource<TReply>)state).TrySetException(
new MessageNeverProcessedException(ActorCompletedMessage)),
MessageNeverProcessedException.CreateCompleted()),
tcs,
cts.Token,
TaskContinuationOptions.ExecuteSynchronously,
Expand Down
10 changes: 10 additions & 0 deletions src/BiDaFlow/Actors/MessageNeverProcessedException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,15 @@ public MessageNeverProcessedException(string message)
public MessageNeverProcessedException(string message, Exception innerException)
: base(message, innerException)
{ }

internal static MessageNeverProcessedException CreateDeclined()
{
return new MessageNeverProcessedException("The message was declined by the target block.");
}

internal static MessageNeverProcessedException CreateCompleted()
{
return new MessageNeverProcessedException("The message was enqueued to the actor but the actor has been completed.");
}
}
}
22 changes: 17 additions & 5 deletions src/BiDaFlow/Actors/SupervisedBlockExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ public static bool Post<TActor>(this SupervisedBlock<TActor> supervisedActor, Fu
var actor = actorOpt.Value;
var envelope = createMessage(actor);

if (!ReferenceEquals(envelope?.Address, actor))
if (envelope == null)
throw new InvalidOperationException("createMessage returned null.");

if (!ReferenceEquals(envelope.Address, actor))
throw new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor.");

return createMessage(actor).Post();
Expand Down Expand Up @@ -186,6 +189,9 @@ public static Task<bool> SendAsync<TActor>(
var actor = actorOpt.Value;
var envelope = createMessage(actor);

if (envelope == null)
throw new InvalidOperationException("createMessage returned null.");

if (!ReferenceEquals(envelope.Address, actor))
{
tcs.TrySetException(new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor."));
Expand All @@ -205,12 +211,15 @@ public static Task<TReply> PostAndReceiveReplyAsync<TActor, TReply>(this Supervi
if (createMessage == null) throw new ArgumentNullException(nameof(createMessage));

var actorOpt = supervisedActor.CurrentBlock;
if (!actorOpt.HasValue) return Task.FromException<TReply>(new MessageDeclinedException());
if (!actorOpt.HasValue) return Task.FromException<TReply>(MessageNeverProcessedException.CreateDeclined());

var actor = actorOpt.Value;
var envelope = createMessage(actor);

if (!ReferenceEquals(envelope?.Address, actor))
if (envelope == null)
throw new InvalidOperationException("createMessage returned null.");

if (!ReferenceEquals(envelope.Address, actor))
throw new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor.");

return createMessage(actor).PostAndReceiveReplyAsync();
Expand All @@ -226,7 +235,7 @@ public static Task<TReply> SendAndReceiveReplyAsync<TActor, TReply>(
if (createMessage == null) throw new ArgumentNullException(nameof(createMessage));

if (supervisedActor.Completion.IsCompleted)
return Task.FromException<TReply>(new MessageDeclinedException());
return Task.FromException<TReply>(MessageNeverProcessedException.CreateDeclined());

if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled<TReply>(cancellationToken);
Expand All @@ -248,7 +257,7 @@ public static Task<TReply> SendAndReceiveReplyAsync<TActor, TReply>(

if (ex != null || completed)
{
tcs.TrySetException(new MessageDeclinedException());
tcs.TrySetException(MessageNeverProcessedException.CreateDeclined());
return;
}

Expand All @@ -261,6 +270,9 @@ public static Task<TReply> SendAndReceiveReplyAsync<TActor, TReply>(
var actor = actorOpt.Value;
var envelope = createMessage(actor);

if (envelope == null)
throw new InvalidOperationException("createMessage returned null.");

if (!ReferenceEquals(envelope.Address, actor))
{
tcs.TrySetException(new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor."));
Expand Down
7 changes: 4 additions & 3 deletions src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ private void CompleteCore(Exception? exception)
else if (exception != null)
exceptions.Add(exception);

this.ReleaseAllReservations();
this.ReleasePostponedMessages();
}
catch (Exception ex)
{
Expand All @@ -448,11 +448,12 @@ private void CompleteCore(Exception? exception)
}
}

private void ReleaseAllReservations()
private void ReleasePostponedMessages()
{
foreach (var message in this._offeringMessages)
{
if (message.ReservedBy != null)
// https://github.com/dotnet/runtime/blob/89b8591928bcb9f90956c938fcd9fcfb2fdfb476/src/libraries/System.Threading.Tasks.Dataflow/src/Internal/Common.cs#L508-L511
if (message.ReservedBy != null || message.Source.ReserveMessage(message.SourceHeader, this))
{
message.Source.ReleaseReservation(message.SourceHeader, this);
}
Expand Down
53 changes: 10 additions & 43 deletions tests/BiDaFlow.Tests/Actors/ErrorHandlingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,69 +9,36 @@ namespace BiDaFlow.Tests.Actors
public partial class ErrorHandlingTests
{
[Fact]
public void TestThrowToSender()
public async Task TestThrowToSender()
{
var actor = new TestErrorHandlingActor();

var aex = Assert
.Throws<AggregateException>(() => actor.ThrowToSender()
.PostAndReceiveReplyAsync().Wait(TestUtils.CancelSometimeSoon()))
.Flatten();
var ex = await Assert.ThrowsAsync<Exception>(() =>
actor.Throw().PostAndReceiveReplyAsync().CompleteSoon());
ex.Message.Is("test1");

aex.InnerExceptions.Count.Is(1);
aex.InnerException!.Message.Is("test1");
actor.Completion.IsCompleted.IsFalse();
}

[Fact]
public void TestThrowToReceiver()
public async Task TestDiscardReply()
{
var actor = new TestErrorHandlingActor();
actor.Throw().DiscardReply().Post().IsTrue();

// If an exception is thrown to the receiver, the task should be canceled.
var aex = Assert.ThrowsAny<AggregateException>(() =>
actor.ThrowToReceiver().PostAndReceiveReplyAsync().Wait(TestUtils.CancelSometimeSoon()));
aex.InnerExceptions.Count.Is(1);
aex.InnerException!.IsInstanceOf<TaskCanceledException>();

aex = Assert
.Throws<AggregateException>(() => actor.Completion.Wait(TestUtils.CancelSometimeSoon()))
.Flatten();
aex.InnerExceptions.Count.Is(1);
aex.InnerException!.Message.Is("test2");
}

[Fact]
public void TestDiscardReply()
{
var actor = new TestErrorHandlingActor();
actor.ThrowToSender().DiscardReply().Post().IsTrue();

// If the reply is discarded, the exception is thrown to the receiver even if handleErrorByReceiver is false.
var aex = Assert
.Throws<AggregateException>(() => actor.Completion.Wait(TestUtils.CancelSometimeSoon()))
.Flatten();

aex.InnerExceptions.Count.Is(1);
aex.InnerException!.Message.Is("test1");
// If the reply is discarded, the exception is thrown to the receiver.
var ex = await Assert.ThrowsAsync<Exception>(() => actor.Completion.CompleteSoon());
ex.Message.Is("test1");
}

private class TestErrorHandlingActor : Actor
{
public EnvelopeWithReply<int> ThrowToSender()
public EnvelopeWithReply<int> Throw()
{
return this.CreateMessageWithReply(
new Func<int>(() => throw new Exception("test1"))
);
}

public EnvelopeWithReply<int> ThrowToReceiver()
{
return this.CreateMessageWithReply(
new Func<int>(() => throw new Exception("test2")),
true
);
}
}
}
}
14 changes: 14 additions & 0 deletions tests/BiDaFlow.Tests/Blocks/BufferBlockTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,19 @@ public async Task TestMaxMessages()
testBlock.Post(2).IsTrue();
testBlock.Post(3).IsFalse("Reach BoundedCapacity");
}

[Fact]
public async Task TestCompleteAndCancelSending()
{
var testBlock = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 1 });
testBlock.Post(1).IsTrue();

var sendTask = testBlock.SendAsync(2);
await sendTask.NeverComplete();

testBlock.Complete();

(await sendTask.CompleteSoon()).IsFalse();
}
}
}
13 changes: 13 additions & 0 deletions tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,18 @@ public async Task TestMaxMessages()

transformBlock.Post(2).IsFalse();
}

[Fact]
public async Task TestCompleteAndCancelSending()
{
var testBlock = new TransformWithoutBufferBlock<int, int>(x => x);

var sendTask = testBlock.SendAsync(1);
await sendTask.NeverComplete();

testBlock.Complete();

(await sendTask.CompleteSoon()).IsFalse();
}
}
}
10 changes: 9 additions & 1 deletion tests/BiDaFlow.Tests/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ public static CancellationToken CancelSometimeSoon()
public static async Task CompleteSoon(this Task task)
{
await Task.WhenAny(task, Task.Delay(SometimeSoon)).ConfigureAwait(false);
task.Status.Is(TaskStatus.RanToCompletion);
task.IsCompleted.IsTrue();
await task;
}

public static async Task<T> CompleteSoon<T>(this Task<T> task)
{
await Task.WhenAny(task, Task.Delay(SometimeSoon)).ConfigureAwait(false);
task.IsCompleted.IsTrue();
return await task;
}

public static async Task CanceledSoon(this Task task)
Expand Down

0 comments on commit 90416e1

Please sign in to comment.