Skip to content

Commit

Permalink
SupervisedDataflowBlockExtensions.Post, SendAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
azyobuzin committed Apr 28, 2020
1 parent 1dd72fc commit 0737113
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 60 deletions.
70 changes: 53 additions & 17 deletions src/BiDaFlow/Actors/Envelope.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Expand All @@ -7,7 +8,7 @@ namespace BiDaFlow.Actors
{
public class Envelope
{
public Actor Address { get; }
internal Actor Address { get; }
internal Func<Task> Action { get; }

internal Envelope(Actor address, Func<Task> action)
Expand All @@ -34,7 +35,7 @@ public Task<bool> SendAsync()

public class EnvelopeWithReply<TReply>
{
public Actor Address { get; }
internal Actor Address { get; }
internal Func<Task<TReply>> Action { get; }
internal bool HandleErrorByReceiver { get; }

Expand All @@ -48,18 +49,18 @@ internal EnvelopeWithReply(Actor address, Func<Task<TReply>> action, bool handle
public Task<TReply> PostAndReceiveReplyAsync()
{
var tcs = new TaskCompletionSource<TReply>();
var envelope = this.ToEnvelope(tcs, this.HandleErrorByReceiver);
var envelope = this.HandleReply(tcs);

if (!envelope.Post())
tcs.TrySetCanceled();
tcs.TrySetException(new MessageDeclinedException());

return tcs.Task;
}

public Task<TReply> SendAndReceiveReplyAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<TReply>();
var envelope = this.ToEnvelope(tcs, this.HandleErrorByReceiver);
var envelope = this.HandleReply(tcs);

envelope.SendAsync(cancellationToken)
.ContinueWith(
Expand All @@ -69,13 +70,17 @@ public Task<TReply> SendAndReceiveReplyAsync(CancellationToken cancellationToken
{
tcs.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled || t.Result == false)
else if (t.IsCanceled)
{
if (cancellationToken.IsCancellationRequested)
tcs.TrySetCanceled(cancellationToken);
else
tcs.TrySetCanceled();
}
else if (t.Result == false)
{
tcs.TrySetException(new MessageDeclinedException());
}
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach,
Expand All @@ -91,12 +96,13 @@ public Task<TReply> SendAndReceiveReplyAsync()

public Envelope DiscardReply()
{
var tcs = new TaskCompletionSource<TReply>();
return this.ToEnvelope(tcs, true);
return this.HandleReply((Action<TReply, Exception?, bool>?)null);
}

private Envelope ToEnvelope(TaskCompletionSource<TReply> tcs, bool handleErrorByReceiver)
internal Envelope HandleReply(Action<TReply, Exception?, bool>? replyHandler)
{
var handleErrorByReceiver = replyHandler == null || this.HandleErrorByReceiver;

return new Envelope(this.Address, () =>
{
Task<TReply> task;
Expand All @@ -108,17 +114,17 @@ private Envelope ToEnvelope(TaskCompletionSource<TReply> tcs, bool handleErrorBy
{
if (handleErrorByReceiver)
{
tcs.TrySetCanceled();
ReplyCanceled();
throw;
}

tcs.TrySetException(ex);
ReplyFault(ex);
return Task.CompletedTask;
}

if (task == null)
{
tcs.TrySetCanceled();
ReplyCanceled();
return Task.CompletedTask;
}

Expand All @@ -129,27 +135,57 @@ private Envelope ToEnvelope(TaskCompletionSource<TReply> tcs, bool handleErrorBy
{
if (handleErrorByReceiver)
{
tcs.TrySetCanceled();
ReplyCanceled();
return t;
}

tcs.TrySetException(t.Exception.InnerExceptions);
ReplyFault(t.Exception);
}
else if (t.IsCanceled)
{
tcs.TrySetCanceled();
ReplyCanceled();
}
else
{
tcs.TrySetResult(t.Result);
replyHandler?.Invoke(t.Result, null, false);
}

return Task.CompletedTask;
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default
).Unwrap();

void ReplyCanceled() => replyHandler?.Invoke(default!, null, true);

void ReplyFault(Exception exception)
{
Debug.Assert(replyHandler != null);
replyHandler!(default!, exception, false);
}
});
}

internal Envelope HandleReply(TaskCompletionSource<TReply> tcs)
{
return this.HandleReply((reply, ex, canceled) =>
{
if (ex != null)
{
if (ex is AggregateException aex)
tcs.TrySetException(aex.InnerExceptions);
else
tcs.TrySetException(ex);
}
else if (canceled)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(reply);
}
});
}
}
Expand Down
33 changes: 15 additions & 18 deletions src/BiDaFlow/Actors/SupervisedDataflowBlock.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BiDaFlow.Internal;
Expand All @@ -20,6 +21,13 @@ internal SupervisedDataflowBlock(Func<Task<T>> startFunc, Func<AggregateExceptio

this._currentBlockSubject.Subscribe(this.SetContinuationToBlock);

this.Completion.ContinueWith(
(_, state) => ((SupervisedDataflowBlock<T>)state)._currentBlockSubject.OnCompleted(),
this,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach,
taskScheduler);

new TaskFactory(taskScheduler).StartNew(this.Restart);
}

Expand All @@ -29,27 +37,16 @@ internal SupervisedDataflowBlock(Func<Task<T>> startFunc, Func<AggregateExceptio

internal IObservable<Optional<T>> CurrentBlockObservable => this._currentBlockSubject;

internal Optional<T> CurrentBlock => this._currentBlockSubject.Value;

internal void EnqueueAction(Action<T> action)
{
IDisposable? unsubscriber = null;
var done = false;

unsubscriber = this._currentBlockSubject
.Subscribe(opt =>
this.CurrentBlockObservable
.Where(x => x.HasValue)
.ReceiveOnce((block, ex, completed) =>
{
if (done)
{
unsubscriber?.Dispose();
return;
}

if (opt.HasValue)
{
done = true;
unsubscriber?.Dispose();

action(opt.Value);
}
if (ex == null && !completed)
action(block.Value);
});
}

Expand Down
93 changes: 73 additions & 20 deletions src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ public static ISourceBlock<TOutput> AsSourceBlock<T, TOutput>(this SupervisedDat
supervisedBlock.TaskScheduler
);

supervisedBlock.CurrentBlockObservable
.Subscribe(opt =>
{
if (!opt.HasValue) return;
opt.Value.LinkTo(helperBlock);
});
if (!supervisedBlock.Completion.IsCompleted)
{
supervisedBlock.CurrentBlockObservable
.Subscribe(opt =>
{
if (!opt.HasValue) return;
opt.Value.LinkTo(helperBlock);
});
}

return sourceBlock;
}
Expand Down Expand Up @@ -94,21 +97,17 @@ public static ITargetBlock<TInput> AsTargetBlock<T, TInput>(this SupervisedDataf
}
else
{
// Complete helperBlock to get OfferMessage to return DecliningPermanently
supervisedBlock.Completion.ContinueWith(
(_, state) => ((IDataflowBlock)state).Complete(),
helperBlock,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
supervisedBlock.TaskScheduler
);

supervisedBlock.CurrentBlockObservable
.Subscribe(opt =>
{
if (!opt.HasValue) return;
helperBlock.LinkTo(opt.Value);
});
.Subscribe(
opt =>
{
if (!opt.HasValue) return;
helperBlock.LinkTo(opt.Value);
},
null,
// Complete helperBlock to get OfferMessage to return DecliningPermanently
helperBlock.Complete
);
}

return targetBlock;
Expand All @@ -120,6 +119,60 @@ public static IPropagatorBlock<TInput, TOutput> AsPropagatorBlock<T, TInput, TOu
return DataflowBlock.Encapsulate(supervisedBlock.AsTargetBlock<T, TInput>(), supervisedBlock.AsSourceBlock<T, TOutput>());
}

public static bool Post<TActor>(this SupervisedDataflowBlock<TActor> supervisedActor, Func<TActor, Envelope> createMessage)
where TActor : Actor
{
var actorOpt = supervisedActor.CurrentBlock;
if (!actorOpt.HasValue) return false;

var actor = actorOpt.Value;
var envelope = createMessage(actor);

if (!ReferenceEquals(envelope.Address, actor))
throw new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor.");

return createMessage(actor).Post();
}

public static Task<bool> SendAsync<TActor>(this SupervisedDataflowBlock<TActor> supervisedActor, Func<TActor, Envelope> createMessage, CancellationToken cancellationToken)
where TActor : Actor
{
if (supervisedActor.Completion.IsCompleted)
return Task.FromResult(false);

var tcs = new TaskCompletionSource<Task<bool>>();

var subscription = supervisedActor.CurrentBlockObservable
.Where(x => x.HasValue)
.ReceiveOnce((actorOpt, ex, completed) =>
{
if (ex != null || completed)
{
tcs.TrySetResult(Task.FromResult(false));
return;
}

var actor = actorOpt.Value;
var envelope = createMessage(actor);

if (!ReferenceEquals(envelope.Address, actor))
{
tcs.TrySetException(new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor."));
return;
}

tcs.TrySetResult(envelope.SendAsync(cancellationToken));
});

cancellationToken.Register(() =>
{
subscription.Dispose();
tcs.TrySetCanceled(cancellationToken);
});

return tcs.Task.Unwrap();
}

// TODO: Actor support
}
}
15 changes: 11 additions & 4 deletions src/BiDaFlow/Internal/BehaviorSubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@ internal sealed class BehaviorSubject<T> : IObservable<T>, IObserver<T>
{
private readonly DoubleLinkedList<IObserver<T>> _subscribers = new DoubleLinkedList<IObserver<T>>();

private T _value;
private bool _isCompleted;
private Exception? _error;

public T Value { get; private set; }

public BehaviorSubject(T initialValue)
{
this.Value = initialValue;
this._value = initialValue;
}

public T Value
{
get
{
lock (this.Lock) return this._value;
}
}

private object Lock => this._subscribers;
Expand All @@ -26,7 +33,7 @@ public void OnNext(T value)
{
if (this._isCompleted) return;

this.Value = value;
this._value = value;
subscriberNode = this._subscribers.First;
}

Expand Down
Loading

0 comments on commit 0737113

Please sign in to comment.