diff --git a/src/BiDaFlow/Actors/Envelope.cs b/src/BiDaFlow/Actors/Envelope.cs index 4e1504b..30d7440 100644 --- a/src/BiDaFlow/Actors/Envelope.cs +++ b/src/BiDaFlow/Actors/Envelope.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -7,7 +8,7 @@ namespace BiDaFlow.Actors { public class Envelope { - public Actor Address { get; } + internal Actor Address { get; } internal Func Action { get; } internal Envelope(Actor address, Func action) @@ -34,7 +35,7 @@ public Task SendAsync() public class EnvelopeWithReply { - public Actor Address { get; } + internal Actor Address { get; } internal Func> Action { get; } internal bool HandleErrorByReceiver { get; } @@ -48,10 +49,10 @@ internal EnvelopeWithReply(Actor address, Func> action, bool handle public Task PostAndReceiveReplyAsync() { var tcs = new TaskCompletionSource(); - var envelope = this.ToEnvelope(tcs, this.HandleErrorByReceiver); + var envelope = this.HandleReply(tcs); if (!envelope.Post()) - tcs.TrySetCanceled(); + tcs.TrySetException(new MessageDeclinedException()); return tcs.Task; } @@ -59,7 +60,7 @@ public Task PostAndReceiveReplyAsync() public Task SendAndReceiveReplyAsync(CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); - var envelope = this.ToEnvelope(tcs, this.HandleErrorByReceiver); + var envelope = this.HandleReply(tcs); envelope.SendAsync(cancellationToken) .ContinueWith( @@ -69,13 +70,17 @@ public Task SendAndReceiveReplyAsync(CancellationToken cancellationToken { tcs.TrySetException(t.Exception.InnerExceptions); } - else if (t.IsCanceled || t.Result == false) + else if (t.IsCanceled) { if (cancellationToken.IsCancellationRequested) tcs.TrySetCanceled(cancellationToken); else tcs.TrySetCanceled(); } + else if (t.Result == false) + { + tcs.TrySetException(new MessageDeclinedException()); + } }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach, @@ -91,12 +96,13 @@ public Task SendAndReceiveReplyAsync() public Envelope DiscardReply() { - var tcs = new TaskCompletionSource(); - return this.ToEnvelope(tcs, true); + return this.HandleReply((Action?)null); } - private Envelope ToEnvelope(TaskCompletionSource tcs, bool handleErrorByReceiver) + internal Envelope HandleReply(Action? replyHandler) { + var handleErrorByReceiver = replyHandler == null || this.HandleErrorByReceiver; + return new Envelope(this.Address, () => { Task task; @@ -108,17 +114,17 @@ private Envelope ToEnvelope(TaskCompletionSource tcs, bool handleErrorBy { if (handleErrorByReceiver) { - tcs.TrySetCanceled(); + ReplyCanceled(); throw; } - tcs.TrySetException(ex); + ReplyFault(ex); return Task.CompletedTask; } if (task == null) { - tcs.TrySetCanceled(); + ReplyCanceled(); return Task.CompletedTask; } @@ -129,27 +135,57 @@ private Envelope ToEnvelope(TaskCompletionSource tcs, bool handleErrorBy { if (handleErrorByReceiver) { - tcs.TrySetCanceled(); + ReplyCanceled(); return t; } - tcs.TrySetException(t.Exception.InnerExceptions); + ReplyFault(t.Exception); } else if (t.IsCanceled) { - tcs.TrySetCanceled(); + ReplyCanceled(); } else { - tcs.TrySetResult(t.Result); + replyHandler?.Invoke(t.Result, null, false); } return Task.CompletedTask; }, CancellationToken.None, - TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach, + TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default ).Unwrap(); + + void ReplyCanceled() => replyHandler?.Invoke(default!, null, true); + + void ReplyFault(Exception exception) + { + Debug.Assert(replyHandler != null); + replyHandler!(default!, exception, false); + } + }); + } + + internal Envelope HandleReply(TaskCompletionSource tcs) + { + return this.HandleReply((reply, ex, canceled) => + { + if (ex != null) + { + if (ex is AggregateException aex) + tcs.TrySetException(aex.InnerExceptions); + else + tcs.TrySetException(ex); + } + else if (canceled) + { + tcs.TrySetCanceled(); + } + else + { + tcs.TrySetResult(reply); + } }); } } diff --git a/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs b/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs index 014d278..32f6679 100644 --- a/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs +++ b/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using BiDaFlow.Internal; @@ -20,6 +21,13 @@ internal SupervisedDataflowBlock(Func> startFunc, Func ((SupervisedDataflowBlock)state)._currentBlockSubject.OnCompleted(), + this, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.DenyChildAttach, + taskScheduler); + new TaskFactory(taskScheduler).StartNew(this.Restart); } @@ -29,27 +37,16 @@ internal SupervisedDataflowBlock(Func> startFunc, Func> CurrentBlockObservable => this._currentBlockSubject; + internal Optional CurrentBlock => this._currentBlockSubject.Value; + internal void EnqueueAction(Action action) { - IDisposable? unsubscriber = null; - var done = false; - - unsubscriber = this._currentBlockSubject - .Subscribe(opt => + this.CurrentBlockObservable + .Where(x => x.HasValue) + .ReceiveOnce((block, ex, completed) => { - if (done) - { - unsubscriber?.Dispose(); - return; - } - - if (opt.HasValue) - { - done = true; - unsubscriber?.Dispose(); - - action(opt.Value); - } + if (ex == null && !completed) + action(block.Value); }); } diff --git a/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs b/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs index d4dede5..94ba477 100644 --- a/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs +++ b/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs @@ -46,12 +46,15 @@ public static ISourceBlock AsSourceBlock(this SupervisedDat supervisedBlock.TaskScheduler ); - supervisedBlock.CurrentBlockObservable - .Subscribe(opt => - { - if (!opt.HasValue) return; - opt.Value.LinkTo(helperBlock); - }); + if (!supervisedBlock.Completion.IsCompleted) + { + supervisedBlock.CurrentBlockObservable + .Subscribe(opt => + { + if (!opt.HasValue) return; + opt.Value.LinkTo(helperBlock); + }); + } return sourceBlock; } @@ -94,21 +97,17 @@ public static ITargetBlock AsTargetBlock(this SupervisedDataf } else { - // Complete helperBlock to get OfferMessage to return DecliningPermanently - supervisedBlock.Completion.ContinueWith( - (_, state) => ((IDataflowBlock)state).Complete(), - helperBlock, - CancellationToken.None, - TaskContinuationOptions.ExecuteSynchronously, - supervisedBlock.TaskScheduler - ); - supervisedBlock.CurrentBlockObservable - .Subscribe(opt => - { - if (!opt.HasValue) return; - helperBlock.LinkTo(opt.Value); - }); + .Subscribe( + opt => + { + if (!opt.HasValue) return; + helperBlock.LinkTo(opt.Value); + }, + null, + // Complete helperBlock to get OfferMessage to return DecliningPermanently + helperBlock.Complete + ); } return targetBlock; @@ -120,6 +119,60 @@ public static IPropagatorBlock AsPropagatorBlock(), supervisedBlock.AsSourceBlock()); } + public static bool Post(this SupervisedDataflowBlock supervisedActor, Func createMessage) + where TActor : Actor + { + var actorOpt = supervisedActor.CurrentBlock; + if (!actorOpt.HasValue) return false; + + var actor = actorOpt.Value; + var envelope = createMessage(actor); + + if (!ReferenceEquals(envelope.Address, actor)) + throw new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor."); + + return createMessage(actor).Post(); + } + + public static Task SendAsync(this SupervisedDataflowBlock supervisedActor, Func createMessage, CancellationToken cancellationToken) + where TActor : Actor + { + if (supervisedActor.Completion.IsCompleted) + return Task.FromResult(false); + + var tcs = new TaskCompletionSource>(); + + var subscription = supervisedActor.CurrentBlockObservable + .Where(x => x.HasValue) + .ReceiveOnce((actorOpt, ex, completed) => + { + if (ex != null || completed) + { + tcs.TrySetResult(Task.FromResult(false)); + return; + } + + var actor = actorOpt.Value; + var envelope = createMessage(actor); + + if (!ReferenceEquals(envelope.Address, actor)) + { + tcs.TrySetException(new InvalidOperationException("The destination of envelope returned by createMessage is not the specified actor.")); + return; + } + + tcs.TrySetResult(envelope.SendAsync(cancellationToken)); + }); + + cancellationToken.Register(() => + { + subscription.Dispose(); + tcs.TrySetCanceled(cancellationToken); + }); + + return tcs.Task.Unwrap(); + } + // TODO: Actor support } } diff --git a/src/BiDaFlow/Internal/BehaviorSubject.cs b/src/BiDaFlow/Internal/BehaviorSubject.cs index 819cbbb..dc1edc5 100644 --- a/src/BiDaFlow/Internal/BehaviorSubject.cs +++ b/src/BiDaFlow/Internal/BehaviorSubject.cs @@ -6,14 +6,21 @@ internal sealed class BehaviorSubject : IObservable, IObserver { private readonly DoubleLinkedList> _subscribers = new DoubleLinkedList>(); + private T _value; private bool _isCompleted; private Exception? _error; - public T Value { get; private set; } - public BehaviorSubject(T initialValue) { - this.Value = initialValue; + this._value = initialValue; + } + + public T Value + { + get + { + lock (this.Lock) return this._value; + } } private object Lock => this._subscribers; @@ -26,7 +33,7 @@ public void OnNext(T value) { if (this._isCompleted) return; - this.Value = value; + this._value = value; subscriberNode = this._subscribers.First; } diff --git a/src/BiDaFlow/Internal/ObservableExtensions.cs b/src/BiDaFlow/Internal/ObservableExtensions.cs index 11944e2..427f4a8 100644 --- a/src/BiDaFlow/Internal/ObservableExtensions.cs +++ b/src/BiDaFlow/Internal/ObservableExtensions.cs @@ -11,11 +11,93 @@ public static IDisposable Subscribe(this IObservable observable, Action return observable.Subscribe(new DelegateObserver(onNext, null, null)); } - public static IDisposable Subscribe(this IObservable observable, Action? onNext, Action onError, Action? onCompleted) + public static IDisposable Subscribe(this IObservable observable, Action? onNext, Action? onError, Action? onCompleted) { if (observable == null) throw new ArgumentNullException(nameof(observable)); return observable.Subscribe(new DelegateObserver(onNext, onError, onCompleted)); } + + public static IDisposable ReceiveOnce(this IObservable observable, Action action) + { + var lockObj = new object(); + var received = false; + IDisposable? unsubscriber = null; + + var d = observable.Subscribe( + x => + { + if (!CheckReceived()) return; + action(x, null, false); + }, + ex => + { + if (!CheckReceived()) return; + action(default!, ex, false); + }, + () => + { + if (!CheckReceived()) return; + action(default!, null, true); + }); + + lock (lockObj) + { + unsubscriber = d; + } + + if (received) + { + unsubscriber.Dispose(); + return ActionDisposable.Nop; + } + + return unsubscriber; + + bool CheckReceived() + { + lock (lockObj) + { + unsubscriber?.Dispose(); + if (received) return false; + received = true; + } + return true; + } + } + + public static IObservable Where(this IObservable observable, Func predicate) + { + if (observable == null) throw new ArgumentNullException(nameof(observable)); + if (predicate == null) throw new ArgumentNullException(nameof(predicate)); + + return new WhereObservable(observable, predicate); + } + + private sealed class WhereObservable : IObservable + { + private readonly IObservable _observable; + private readonly Func _predicate; + + public WhereObservable(IObservable observable, Func predicate) + { + this._observable = observable; + this._predicate = predicate; + } + + public IDisposable Subscribe(IObserver observer) + { + return this._observable + .Subscribe( + x => + { + if (this._predicate(x)) + observer.OnNext(x); + }, + observer.OnError, + observer.OnCompleted + ); + } + } } } diff --git a/src/BiDaFlow/MessageDeclinedException.cs b/src/BiDaFlow/MessageDeclinedException.cs new file mode 100644 index 0000000..0c56dd3 --- /dev/null +++ b/src/BiDaFlow/MessageDeclinedException.cs @@ -0,0 +1,19 @@ +using System; + +namespace BiDaFlow +{ + public class MessageDeclinedException : Exception + { + public MessageDeclinedException() + : base("The message was declined by the target block.") + { } + + public MessageDeclinedException(string message) + : base(message) + { } + + public MessageDeclinedException(string message, Exception innerException) + : base(message, innerException) + { } + } +}