diff --git a/src/BiDaFlow/Actors/ActorExtensions.cs b/src/BiDaFlow/Actors/ActorExtensions.cs index 31bf3a8..77be562 100644 --- a/src/BiDaFlow/Actors/ActorExtensions.cs +++ b/src/BiDaFlow/Actors/ActorExtensions.cs @@ -8,16 +8,18 @@ namespace BiDaFlow.Actors public static class ActorExtensions { public static ITargetBlock AsTargetBlock(this TActor actor, Func createMessage) - where TActor : IActor + where TActor : Actor { if (actor == null) throw new ArgumentNullException(nameof(actor)); if (createMessage == null) throw new ArgumentNullException(nameof(createMessage)); + var iactor = (IActor)actor; + var transformBlock = new TransformWithoutBufferBlock( x => createMessage(actor, x), - actor.Engine.TaskScheduler, - actor.Engine.CancellationToken); - transformBlock.LinkWithCompletion(actor.Engine.Target); + iactor.Engine.TaskScheduler, + iactor.Engine.CancellationToken); + transformBlock.LinkWithCompletion(iactor.Engine.Target); return transformBlock; } diff --git a/src/BiDaFlow/Actors/Envelope.cs b/src/BiDaFlow/Actors/Envelope.cs index 09078c8..87fbfa0 100644 --- a/src/BiDaFlow/Actors/Envelope.cs +++ b/src/BiDaFlow/Actors/Envelope.cs @@ -7,10 +7,10 @@ namespace BiDaFlow.Actors { public class Envelope { - public IActor Address { get; } + public Actor Address { get; } internal Func Action { get; } - internal Envelope(IActor address, Func action) + internal Envelope(Actor address, Func action) { this.Address = address; this.Action = action; @@ -18,12 +18,12 @@ internal Envelope(IActor address, Func action) public bool Post() { - return this.Address.Engine.Target.Post(this); + return ((IActor)this.Address).Engine.Target.Post(this); } public Task SendAsync(CancellationToken cancellationToken) { - return this.Address.Engine.Target.SendAsync(this, cancellationToken); + return ((IActor)this.Address).Engine.Target.SendAsync(this, cancellationToken); } public Task SendAsync() @@ -34,11 +34,11 @@ public Task SendAsync() public class EnvelopeWithReply { - public IActor Address { get; } + public Actor Address { get; } internal Func> Action { get; } internal bool HandleErrorByReceiver { get; } - internal EnvelopeWithReply(IActor address, Func> action, bool handleErrorByReceiver) + internal EnvelopeWithReply(Actor address, Func> action, bool handleErrorByReceiver) { this.Address = address; this.Action = action; diff --git a/src/BiDaFlow/Actors/IActor.cs b/src/BiDaFlow/Actors/IActor.cs index 663d0c9..a022f17 100644 --- a/src/BiDaFlow/Actors/IActor.cs +++ b/src/BiDaFlow/Actors/IActor.cs @@ -2,6 +2,7 @@ namespace BiDaFlow.Actors { + // TODO: 廃止 public interface IActor : IDataflowBlock { ActorEngine Engine { get; } diff --git a/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs b/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs index 986cfac..f58716c 100644 --- a/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs +++ b/src/BiDaFlow/Actors/SupervisedDataflowBlock.cs @@ -10,8 +10,8 @@ public class SupervisedDataflowBlock : IDataflowBlock where T : class, IDataf { private readonly Func> _startFunc; private readonly Func> _rescueFunc; - private readonly TaskScheduler _taskScheduler; - private readonly TaskCompletionSource _tcs; + internal readonly TaskScheduler _taskScheduler; + private readonly TaskCompletionSource _tcs; private bool _started; private T? _currentBlock; private readonly Queue> _actionQueue = new Queue>(); @@ -21,7 +21,7 @@ internal SupervisedDataflowBlock(Func> startFunc, Func(); + this._tcs = new TaskCompletionSource(); new TaskFactory(taskScheduler).StartNew(this.Restart); } diff --git a/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs b/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs index c225d3f..f2ade88 100644 --- a/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs +++ b/src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs @@ -1,11 +1,34 @@ using System; using System.Collections.Generic; +using System.Runtime.CompilerServices; using System.Text; +using System.Threading; +using System.Threading.Tasks.Dataflow; +using BiDaFlow.Blocks; namespace BiDaFlow.Actors { public static class SupervisedDataflowBlockExtensions { + private static readonly ConditionalWeakTable> s_sourceBlocks = new ConditionalWeakTable>(); + private static readonly ConditionalWeakTable> s_targetBlocks = new ConditionalWeakTable>(); + + public static ISourceBlock AsSourceBlock(this SupervisedDataflowBlock supervisedBlock) + where T : class, ISourceBlock + { + throw new NotImplementedException(); + + var dic = s_sourceBlocks.GetOrCreateValue(supervisedBlock); + + lock(dic) + { + if (dic.TryGetValue(typeof(TSource), out var obj)) + return (ISourceBlock)obj; + + var block = new TransformWithoutBufferBlock(x => x, supervisedBlock._taskScheduler, CancellationToken.None); + } + } + // TODO: AsSourceBlock, AsTargetBlock (and Actor), AsPropagatorBlock } } diff --git a/src/BiDaFlow/BiDaFlow.csproj b/src/BiDaFlow/BiDaFlow.csproj index 4050ca5..e8b5307 100644 --- a/src/BiDaFlow/BiDaFlow.csproj +++ b/src/BiDaFlow/BiDaFlow.csproj @@ -6,7 +6,6 @@ - diff --git a/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs b/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs index 0755eee..eac811d 100644 --- a/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs +++ b/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs @@ -58,6 +58,8 @@ public void Complete() } this._tcs.TrySetResult(default); + + // TODO: Release reservations } void IDataflowBlock.Fault(Exception exception) @@ -290,6 +292,7 @@ DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader me if (linkedTarget.IsLinked) { + // FIXME: If target is LinkTarget, source cannot be null var status = linkedTarget.Target.OfferMessage(myHeader, transformedValue, source == null ? null : this, consumeToAccept); switch (status) @@ -375,7 +378,10 @@ private void OfferToTargets() goto StartConsume; case DataflowMessageStatus.Postponed: - return; + // If the message has been reserved, offer no more + if (messageNode.Value.ReservedBy != null) + return; + break; case DataflowMessageStatus.DecliningPermanently: linkedTarget.Unlink(); diff --git a/src/BiDaFlow/Fluent/FluentDataflow.cs b/src/BiDaFlow/Fluent/FluentDataflow.cs index ddd4524..3b83b5a 100644 --- a/src/BiDaFlow/Fluent/FluentDataflow.cs +++ b/src/BiDaFlow/Fluent/FluentDataflow.cs @@ -82,6 +82,7 @@ public static ISourceBlock AsSourceBlock(this IEnumerable enumerable, C { while (!cancellationToken.IsCancellationRequested && enumerator.MoveNext()) { + // TODO: more efficient implementation var accepted = await block.SendAsync(enumerator.Current, cancellationToken); if (!accepted) return; } @@ -182,7 +183,7 @@ public static IPropagatorBlock Merge(this IPro if (propagator == null) throw new ArgumentNullException(nameof(propagator)); if (sources == null) throw new ArgumentNullException(nameof(sources)); - var mergedSource = Merge(sources.Prepend(propagator)); + var mergedSource = Merge(new[] { propagator }.Concat(sources)); return DataflowBlock.Encapsulate(propagator, mergedSource); } diff --git a/src/BiDaFlow/Internal/LinkedTarget2.cs b/src/BiDaFlow/Internal/LinkedTarget2.cs new file mode 100644 index 0000000..672234f --- /dev/null +++ b/src/BiDaFlow/Internal/LinkedTarget2.cs @@ -0,0 +1,65 @@ +using System; +using System.Threading; +using System.Threading.Tasks.Dataflow; + +namespace BiDaFlow.Internal +{ + internal sealed class LinkedTarget2 + { + public ITargetBlock Target { get; } + + private readonly bool _propagateCompletion; + private readonly Action>? _unlinkCallback; + private int _remainingMessages; + private int _unlinked; + + public LinkedTarget2(ITargetBlock target, int maxMessages, bool propagateCompletion, Action>? unlinkCallback) + { + if (maxMessages <= 0 && maxMessages != DataflowBlockOptions.Unbounded) + throw new ArgumentOutOfRangeException(nameof(maxMessages)); + + this.Target = target ?? throw new ArgumentNullException(nameof(target)); + this._remainingMessages = maxMessages; + this._propagateCompletion = propagateCompletion; + this._unlinkCallback = unlinkCallback; + } + + public bool Unlinked => this._unlinked != 0; + + public void DecrementRemainingMessages() + { + if (this._remainingMessages == DataflowBlockOptions.Unbounded) + return; + + var newValue = this._remainingMessages - 1; + if (newValue < 0) throw new InvalidOperationException("newValue is " + newValue); + + this._remainingMessages = newValue; + + if (newValue == 0) this.Unlink(); + } + + public void Complete(Exception? exception) + { + if (!this._propagateCompletion || this.Unlinked) + return; + + if (exception == null) + { + this.Target.Complete(); + } + else + { + this.Target.Fault(exception); + } + } + + public void Unlink() + { + if (Interlocked.Exchange(ref this._unlinked, 1) == 0) + { + this._unlinkCallback?.Invoke(this); + } + } + } +} diff --git a/src/BiDaFlow/Internal/SourceCore.cs b/src/BiDaFlow/Internal/SourceCore.cs new file mode 100644 index 0000000..6195150 --- /dev/null +++ b/src/BiDaFlow/Internal/SourceCore.cs @@ -0,0 +1,353 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace BiDaFlow.Internal +{ + internal class SourceCore + { + private readonly ISourceBlock _parent; + private readonly TaskFactory _taskFactory; + private readonly Action? _readyToNextItemCallback; + private readonly LinkedList> _links = new LinkedList>(); + private readonly Dictionary, LinkedListNode>> _targetToLinkTable = new Dictionary, LinkedListNode>>(); + + private long _messageId = 1; + private bool _itemAvailable; + private T _offeringItem = default!; + private ITargetBlock? _resevedBy; + private bool _calledReadyCallback; + + private bool _completed; + + public SourceCore(ISourceBlock parent, TaskScheduler taskScheduler, Action? readyToNextItemCallback) + { + this._parent = parent ?? throw new ArgumentNullException(nameof(parent)); + this._taskFactory = new TaskFactory(taskScheduler ?? throw new ArgumentNullException(nameof(taskScheduler))); + this._readyToNextItemCallback = readyToNextItemCallback; + + parent.Completion.ContinueWith(this.HandleCompletion, taskScheduler); + } + + private object Lock => this._links; // any readonly object + + public bool ConsumeToAccept { get; set; } + + public void OfferItem(T item, bool? consumeToAccept = null) + { + lock (this.Lock) + { + this._offeringItem = item; + this._itemAvailable = true; + this._resevedBy = null; + } + + this.OfferToTargets(consumeToAccept ?? this.ConsumeToAccept); + } + + public void DismissItem() + { + lock (this.Lock) + { + this._offeringItem = default!; + this._itemAvailable = false; + this._calledReadyCallback = true; + } + } + + public IDisposable LinkTo(ITargetBlock target, DataflowLinkOptions? linkOptions) + { + if (target == null) throw new ArgumentNullException(nameof(target)); + linkOptions ??= new DataflowLinkOptions(); + + if (target.Completion.IsCompleted || linkOptions.MaxMessages == 0) + return ActionDisposable.Nop; + + LinkedTarget2 linkedTarget; + var callCallback = false; + + lock (this.Lock) + { + if (this._completed) + { + var exception = this._parent.Completion.Exception; + if (exception == null) + target.Complete(); + else + target.Fault(exception); + + return ActionDisposable.Nop; + } + + linkedTarget = new LinkedTarget2(target, linkOptions.MaxMessages, linkOptions.PropagateCompletion, this.HandleUnlink); + + if (linkOptions.Append) + { + var node = this._links.AddLast(linkedTarget); + if (!this._targetToLinkTable.ContainsKey(target)) + this._targetToLinkTable.Add(target, node); + } + else + { + var node = this._links.AddFirst(linkedTarget); + this._targetToLinkTable[target] = node; + } + + if (!this._itemAvailable && !this._calledReadyCallback) + { + this._calledReadyCallback = true; + callCallback = true; + } + } + + if (callCallback) this._readyToNextItemCallback?.Invoke(); + + this.OfferToTargets(this.ConsumeToAccept); + + return new ActionDisposable(linkedTarget.Unlink); + } + + public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock target, out bool messageConsumed) + { + if (!messageHeader.IsValid) throw new ArgumentException("messageHeader is not valid."); + if (target == null) throw new ArgumentNullException(nameof(target)); + + T output; + + lock (this.Lock) + { + if (this._messageId != messageHeader.Id || (this._resevedBy != null && this._resevedBy != target)) + { + messageConsumed = false; + return default!; + } + + if (this._resevedBy != null) + { + this._resevedBy = null; + } + + messageConsumed = this._itemAvailable; + this._itemAvailable = false; + this._resevedBy = null; + this._messageId++; + output = this._offeringItem; + + if (messageConsumed && this._targetToLinkTable.TryGetValue(target, out var linkedTargetNode)) + linkedTargetNode.Value.DecrementRemainingMessages(); + + // If the link has reached to MaxMessages, HandleUnlink has been called in DecrementRemainingMessages. + // Here we can check whether a link is left. + this._calledReadyCallback = this._links.Count > 0; + } + + if (this._calledReadyCallback) this._readyToNextItemCallback?.Invoke(); + + return output; + } + + public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock target) + { + if (!messageHeader.IsValid) throw new ArgumentException("messageHeader is not valid."); + if (target == null) throw new ArgumentNullException(nameof(target)); + + lock (this.Lock) + { + if (this._resevedBy == null && this._messageId == messageHeader.Id) + { + this._resevedBy = target; + return true; + } + } + + return false; + } + + public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock target) + { + if (!messageHeader.IsValid) throw new ArgumentException("messageHeader is not valid."); + if (target == null) throw new ArgumentNullException(nameof(target)); + + lock (this.Lock) + { + if (this._resevedBy == target && this._messageId == messageHeader.Id) + { + this._resevedBy = null; + } + else + { + throw new InvalidOperationException("The message has not been reserved by the target."); + } + } + + this.OfferToTargets(this.ConsumeToAccept); + } + + private void HandleUnlink(LinkedTarget2 linkedTarget) + { + if (linkedTarget == null) throw new ArgumentNullException(nameof(linkedTarget)); + + lock (this.Lock) + { + if (this._completed) return; + + // Remove from the list of linked targets + var node = this._targetToLinkTable[linkedTarget.Target]; + if (node.Value == linkedTarget) + { + var nextSameTargetNode = node.Next; + for (; nextSameTargetNode != null; nextSameTargetNode = nextSameTargetNode.Next) + { + if (Equals(nextSameTargetNode.Value.Target, linkedTarget.Target)) + break; + } + + this._links.Remove(node); + + if (nextSameTargetNode == null) + { + var removed = this._targetToLinkTable.Remove(linkedTarget.Target); + Debug.Assert(removed); + + // Release reservation + if (Equals(this._resevedBy, linkedTarget.Target)) + { + this._resevedBy = null; + this.OfferToTargets(this.ConsumeToAccept); + } + } + else + { + this._targetToLinkTable[linkedTarget.Target] = nextSameTargetNode; + } + } + else + { + this._links.Remove(linkedTarget); + } + } + } + + private void HandleCompletion(Task completionTask) + { + lock (this.Lock) + { + if (this._completed) return; + this._completed = true; + } + + // There is no writer after setting true to _completed. + + foreach (var x in this._links) + x.Complete(completionTask.Exception); + } + + private void OfferToTargets(bool consumeToAccept) + { + lock (this.Lock) + { + if (!this.CanOffer()) return; + } + + this._taskFactory.StartNew( + consumeToAccept + ? (Action)(state => ((SourceCore)state).OfferCoreConsumeToAccept()) + : state => ((SourceCore)state).OfferCore(), + this + ); + } + + private void OfferCore() + { + StartOffer: + lock (this.Lock) + { + if (!CanOffer()) return; + + var header = new DataflowMessageHeader(this._messageId); + + for (var node = this._links.First; node != null;) + { + var linkedTarget = node.Value; + var nextNode = node.Next; + + Debug.Assert(!linkedTarget.Unlinked); + + var status = linkedTarget.Target.OfferMessage(header, this._offeringItem, this._parent, false); + + switch (status) + { + case DataflowMessageStatus.Accepted: + this._itemAvailable = false; + this._messageId++; + linkedTarget.DecrementRemainingMessages(); + + // If the link has reached to MaxMessages, HandleUnlink has been called in DecrementRemainingMessages. + // Here we can check whether a link is left. + this._calledReadyCallback = !this._itemAvailable && this._links.Count > 0; + + if (this._calledReadyCallback) + goto CallReadyCallback; + else + goto StartOffer; + + case DataflowMessageStatus.NotAvailable: + throw new InvalidOperationException("the target returns NotAvailable even though consumeToAccept is false."); + + case DataflowMessageStatus.DecliningPermanently: + linkedTarget.Unlink(); + break; + } + + node = nextNode; + } + } + + CallReadyCallback: + this._readyToNextItemCallback?.Invoke(); + goto StartOffer; + } + + private void OfferCoreConsumeToAccept() + { + StartOffer: + DataflowMessageHeader messageHeader; + T messageValue; + + lock (this.Lock) + { + if (!this.CanOffer()) return; + + messageHeader = new DataflowMessageHeader(this._messageId); + messageValue = this._offeringItem; + } + + // We can offer without lock because ConsumeMessage will lock and checke the state. + + for (var node = this._links.First; node != null;) + { + var linkedTarget = node.Value; + var nextNode = node.Next; + + var status = linkedTarget.Target.OfferMessage(messageHeader, messageValue, this._parent, true); + + switch (status) + { + case DataflowMessageStatus.Accepted: + case DataflowMessageStatus.NotAvailable: + goto StartOffer; + + case DataflowMessageStatus.DecliningPermanently: + linkedTarget.Unlink(); + break; + } + + node = nextNode; + } + } + + private bool CanOffer() => this._itemAvailable && this._resevedBy == null; + } +} diff --git a/tests/BiDaFlow.Tests/Actors/ActorTests.TestOnCompleted.cs b/tests/BiDaFlow.Tests/Actors/ActorTests.TestOnCompleted.cs index d6b776e..5ab5df2 100644 --- a/tests/BiDaFlow.Tests/Actors/ActorTests.TestOnCompleted.cs +++ b/tests/BiDaFlow.Tests/Actors/ActorTests.TestOnCompleted.cs @@ -49,7 +49,7 @@ public void Stop() this.Complete(); } - protected override async ValueTask OnCompleted(AggregateException? exception) + protected override async Task OnCompleted(AggregateException? exception) { await this.SendOutputAsync(42); } @@ -57,7 +57,7 @@ protected override async ValueTask OnCompleted(AggregateException? exception) private class TestThrowOnCompletedActor : Actor { - protected override ValueTask OnCompleted(AggregateException? exception) + protected override Task OnCompleted(AggregateException? exception) { throw new Exception("thrown by OnCompleted"); } diff --git a/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs b/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs index 8f068bc..d228a8c 100644 --- a/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs +++ b/tests/BiDaFlow.Tests/Blocks/TransformWithoutBufferBlockTests.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks.Dataflow; using BiDaFlow.Blocks; using BiDaFlow.Fluent; @@ -50,5 +51,42 @@ public void TestCancel() transformBlock.Completion.Wait(TestUtils.CancelSometimeSoon()); } + + [Fact] + public void TestLink() + { + var transformBlock = new TransformWithoutBufferBlock(x => x); + + var target1 = new BufferBlock(new DataflowBlockOptions() { BoundedCapacity = 1 }); + var target2 = new BufferBlock(new DataflowBlockOptions() { BoundedCapacity = 1 }); + var target3 = new BufferBlock(new DataflowBlockOptions() { BoundedCapacity = 1 }); + + transformBlock.LinkTo(target1); + transformBlock.LinkTo(target2, x => x != 2); + transformBlock.LinkTo(target3); + + transformBlock.Post(1).IsTrue(); + transformBlock.Post(2).IsTrue(); + transformBlock.Post(3).IsTrue(); + + target1.Receive(TestUtils.CancelSometimeSoon()).Is(1); + target2.Receive(TestUtils.CancelSometimeSoon()).Is(3); + target3.Receive(TestUtils.CancelSometimeSoon()).Is(2); + } + + [Fact] + public void TestMaxMessages() + { + var transformBlock = new TransformWithoutBufferBlock(x => x); + var targetBlock = new BufferBlock(); + transformBlock.LinkTo(targetBlock, new DataflowLinkOptions() { MaxMessages = 1, PropagateCompletion = true }); + + transformBlock.Post(1).IsTrue(); + + targetBlock.Receive(TestUtils.CancelSometimeSoon()).Is(1); + Assert.ThrowsAny(() => targetBlock.Completion.Wait(TestUtils.CancelSometimeSoon())); + + transformBlock.Post(2).IsFalse(); + } } }