From 362fa18d0965e50e49c9a36038353e0e76d35068 Mon Sep 17 00:00:00 2001 From: azyobuzin Date: Sat, 2 May 2020 22:26:13 +0900 Subject: [PATCH] doc --- Directory.Build.props | 2 +- README.md | 44 +++++++++++- .../Fluent/DataflowAsyncEnumerable.cs | 12 ++-- src/BiDaFlow/Blocks/ToTargetBlockBlock.cs | 27 ++++++++ .../Blocks/TransformWithoutBufferBlock.cs | 67 +++++++++++++++++++ src/BiDaFlow/Fluent/FluentDataflow.cs | 62 ++++++++++++----- src/Directory.Build.props | 9 +++ 7 files changed, 195 insertions(+), 28 deletions(-) create mode 100644 src/BiDaFlow/Blocks/ToTargetBlockBlock.cs create mode 100644 src/Directory.Build.props diff --git a/Directory.Build.props b/Directory.Build.props index 18e8ef1..0322a43 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 0.1.0 8 enable + 1591 diff --git a/README.md b/README.md index 857e4de..d40f51c 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,46 @@ Battery Included Dataflow Library ## Fluent API for TPL Dataflow -TBD +```csharp +await Enumerable.Range(1, 100).AsSourceBlock() + // RunWith connects to a ITargetBlock and returns a single IDataflowBlock + .RunWith( + new BatchBlock(5, new GroupingDataflowBlockOptions { BoundedCapacity = 5 }) + // Chain makes a new IPropagatorBlock linking the upstream and downstream blocks + .Chain(new TransformBlock( + x => x.Sum(), + new ExecutionDataflowBlockOptions + { + BoundedCapacity = 4, + MaxDegreeOfParallelism = 4, + })) + // ToTargetBlock makes a new ITargetBlock linking the upstream and downstream blocks + .ToTargetBlock(new ActionBlock(x => Console.WriteLine(x))) + ) + .Completion; +``` -## Actor-like Programming +## AsyncEnumerable Integration -TBD +```csharp +await AsyncEnumerable.Range(1, 100) + // Process elements in parallel with IPropagatorBlock + .RunThroughDataflowBlock(() => + new TransformBlock( + x => x * 10, + new ExecutionDataflowBlockOptions + { + BoundedCapacity = 6, + MaxDegreeOfParallelism = 4, + SingleProducerConstrained = true, + }) + ) + // The result is an IAsyncEnumerable + // Subsequent process can be written with System.Linq.Async + .ForEachAsync(x => Console.WriteLine(x)); +``` + +# Roadmap + +- BiDaFlow.Actors - a lightweight actor-like programming framework with IDataflowBlock +- BiDaFlow.ReactiveStreams - integration with [Reactive Streams](https://github.com/reactive-streams/reactive-streams-dotnet), TPL Dataflow and AsyncEnumerable diff --git a/src/BiDaFlow.AsyncEnumerable/Fluent/DataflowAsyncEnumerable.cs b/src/BiDaFlow.AsyncEnumerable/Fluent/DataflowAsyncEnumerable.cs index b637104..2242c85 100644 --- a/src/BiDaFlow.AsyncEnumerable/Fluent/DataflowAsyncEnumerable.cs +++ b/src/BiDaFlow.AsyncEnumerable/Fluent/DataflowAsyncEnumerable.cs @@ -9,24 +9,24 @@ namespace BiDaFlow.Fluent { public static class DataflowAsyncEnumerable { - public static ISourceBlock AsSourceBlock(this IAsyncEnumerable enumerable, CancellationToken cancellationToken = default) + public static ISourceBlock AsSourceBlock(this IAsyncEnumerable enumerable, CancellationToken cancellationToken = default) { if (enumerable == null) throw new ArgumentNullException(nameof(enumerable)); - return new AsyncEnumerableSourceBlock(enumerable, null, cancellationToken); + return new AsyncEnumerableSourceBlock(enumerable, null, cancellationToken); } - public static ISourceBlock AsSourceBlock(this IAsyncEnumerable enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken) + public static ISourceBlock AsSourceBlock(this IAsyncEnumerable enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken) { if (enumerable == null) throw new ArgumentNullException(nameof(enumerable)); if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler)); - return new AsyncEnumerableSourceBlock(enumerable, taskScheduler, cancellationToken); + return new AsyncEnumerableSourceBlock(enumerable, taskScheduler, cancellationToken); } - public static IAsyncEnumerable AsAsyncEnumerable(this ISourceBlock source) + public static IAsyncEnumerable AsAsyncEnumerable(this ISourceBlock source) { - return new SourceBlockAsyncEnumerable(source ?? throw new ArgumentNullException(nameof(source))); + return new SourceBlockAsyncEnumerable(source ?? throw new ArgumentNullException(nameof(source))); } public static IAsyncEnumerable RunThroughDataflowBlock( diff --git a/src/BiDaFlow/Blocks/ToTargetBlockBlock.cs b/src/BiDaFlow/Blocks/ToTargetBlockBlock.cs new file mode 100644 index 0000000..41f5651 --- /dev/null +++ b/src/BiDaFlow/Blocks/ToTargetBlockBlock.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace BiDaFlow.Blocks +{ + internal sealed class ToTargetBlockBlock : ITargetBlock + { + private readonly ITargetBlock _sourceBlock; + private readonly IDataflowBlock _targetBlock; + + public ToTargetBlockBlock(ITargetBlock sourceBlock, IDataflowBlock targetBlock) + { + this._sourceBlock = sourceBlock; + this._targetBlock = targetBlock; + } + + public Task Completion => this._targetBlock.Completion; + + public void Complete() => this._sourceBlock.Complete(); + + public void Fault(Exception exception) => this._sourceBlock.Fault(exception); + + public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock source, bool consumeToAccept) + => this._sourceBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept); + } +} diff --git a/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs b/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs index 05d6b9f..a6f7ba4 100644 --- a/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs +++ b/src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs @@ -8,6 +8,10 @@ namespace BiDaFlow.Blocks { + /// + /// Provides a dataflow block like . + /// This block does not consume items from source blocks until offering a message to link targets succeeds. + /// public class TransformWithoutBufferBlock : IPropagatorBlock { private readonly Func _transform; @@ -21,6 +25,25 @@ public class TransformWithoutBufferBlock : IPropagatorBlock _offeringMessages = new LinkedList(); // TODO: more efficient structure + /// + /// Initializes a new . + /// + /// + /// + /// A transform function. + /// Note that this function should be pure because it can be called multiple times for the same item. + /// + /// + /// A used by offering messages. + /// + /// + /// A to monitor for cancellation requests. + /// When cancellation is requested, this block behaves like when is called. + /// + /// + /// + /// or is . + /// public TransformWithoutBufferBlock(Func transform, TaskScheduler taskScheduler, CancellationToken cancellationToken) { this._transform = transform ?? throw new ArgumentNullException(nameof(transform)); @@ -42,9 +65,44 @@ public TransformWithoutBufferBlock(Func transform, TaskSchedule this.Completion.ContinueWith(this.HandleCompletion, taskScheduler); } + /// + /// Initializes a new . + /// + /// + /// + /// A transform function. + /// Note that this function should be pure because it can be called multiple times for the same item. + /// + /// + /// is . + /// + /// + /// This overload calls + /// with and . + /// public TransformWithoutBufferBlock(Func transform) : this(transform, TaskScheduler.Default, CancellationToken.None) { } + /// + /// Initializes a new . + /// + /// + /// + /// A transform function. + /// Note that this function should be pure because it can be called multiple times for the same item. + /// + /// + /// + /// A to monitor for cancellation requests. + /// When cancellation is requested, this block behaves like when is called. + /// + /// + /// is . + /// + /// + /// This overload calls + /// with . + /// public TransformWithoutBufferBlock(Func transform, CancellationToken cancellationToken) : this(transform, TaskScheduler.Default, cancellationToken) { } @@ -58,8 +116,16 @@ public TransformWithoutBufferBlock(Func transform, Cancellation /// private object OfferLock => this._offeringMessages; + /// public Task Completion => this._tcs.Task; + /// + /// Signals to the block to stop consuming and offering messages. + /// + /// + /// When this method is called, will immediately be completed + /// because this block has no buffer. + /// public void Complete() { this.CompleteCore(null); @@ -72,6 +138,7 @@ void IDataflowBlock.Fault(Exception exception) this.CompleteCore(exception); } + /// public IDisposable LinkTo(ITargetBlock target, DataflowLinkOptions linkOptions) { if (target == null) throw new ArgumentNullException(nameof(target)); diff --git a/src/BiDaFlow/Fluent/FluentDataflow.cs b/src/BiDaFlow/Fluent/FluentDataflow.cs index d3a41b5..284d455 100644 --- a/src/BiDaFlow/Fluent/FluentDataflow.cs +++ b/src/BiDaFlow/Fluent/FluentDataflow.cs @@ -16,6 +16,13 @@ public static IDisposable PropagateCompletion(this IDataflowBlock source, IDataf return source.PropagateCompletion(target, WhenPropagate.Both); } + /// + /// Links to to notify completion of to . + /// + /// An to unlink the source from the target. + /// + /// or is . + /// public static IDisposable PropagateCompletion(this IDataflowBlock source, IDataflowBlock target, WhenPropagate when) { if (source == null) throw new ArgumentNullException(nameof(source)); @@ -64,6 +71,14 @@ void SendCompletion() private static readonly DataflowLinkOptions s_propagateCompletionOptions = new DataflowLinkOptions() { PropagateCompletion = true }; + /// + /// Links the to the specified propagating completion. + /// + /// + /// This method is the same as DataflowBlock.LinkTo(source, target, new DataflowLinkOptions() { PropagateCompletion = true }). + /// + /// + /// public static IDisposable LinkWithCompletion(this ISourceBlock source, ITargetBlock target) { if (source == null) throw new ArgumentNullException(nameof(source)); @@ -72,45 +87,56 @@ public static IDisposable LinkWithCompletion(this ISourceBlock return source.LinkTo(target, s_propagateCompletionOptions); } - public static ISourceBlock CompletedSourceBlock() + /// + /// Returns a that have been completed. + /// + public static ISourceBlock CompletedSourceBlock() { - return CompletedSourceBlockHolder.Instance; + return CompletedSourceBlockHolder.Instance; } - public static ISourceBlock AsSourceBlock(this IEnumerable enumerable, CancellationToken cancellationToken = default) + public static ISourceBlock AsSourceBlock(this IEnumerable enumerable, CancellationToken cancellationToken = default) { if (enumerable == null) throw new ArgumentNullException(nameof(enumerable)); - return new EnumerableSourceBlock(enumerable, null, cancellationToken); + return new EnumerableSourceBlock(enumerable, null, cancellationToken); } - public static ISourceBlock AsSourceBlock(this IEnumerable enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken) + public static ISourceBlock AsSourceBlock(this IEnumerable enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken) { if (enumerable == null) throw new ArgumentNullException(nameof(enumerable)); if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler)); - return new EnumerableSourceBlock(enumerable, taskScheduler, cancellationToken); + return new EnumerableSourceBlock(enumerable, taskScheduler, cancellationToken); } - public static ISourceBlock ToSourceBlock(this IEnumerator enumerator, CancellationToken cancellationToken = default) + public static ISourceBlock ToSourceBlock(this IEnumerator enumerator, CancellationToken cancellationToken = default) { if (enumerator == null) throw new ArgumentNullException(nameof(enumerator)); - return new EnumerableSourceBlock(enumerator, null, cancellationToken); + return new EnumerableSourceBlock(enumerator, null, cancellationToken); } - public static ISourceBlock ToSourceBlock(this IEnumerator enumerator, TaskScheduler taskScheduler, CancellationToken cancellationToken) + public static ISourceBlock ToSourceBlock(this IEnumerator enumerator, TaskScheduler taskScheduler, CancellationToken cancellationToken) { if (enumerator == null) throw new ArgumentNullException(nameof(enumerator)); if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler)); - return new EnumerableSourceBlock(enumerator, taskScheduler, cancellationToken); + return new EnumerableSourceBlock(enumerator, taskScheduler, cancellationToken); } - public static IObserver AsObserverDroppingOverflowItems(this ITargetBlock target) + /// + /// Creates a new abstraction over the . + /// This method drops overflow items in contrast to , + /// which buffers overflow items with . + /// + /// The target to wrap + /// An oberver that wraps the target block. + /// is . + public static IObserver AsObserverDroppingOverflowItems(this ITargetBlock target) { if (target == null) throw new ArgumentNullException(nameof(target)); - return new DroppingObserver(target); + return new DroppingObserver(target); } public static IPropagatorBlock Chain( @@ -130,7 +156,7 @@ public static ITargetBlock ToTargetBlock(this IPropagat if (target == null) throw new ArgumentNullException(nameof(target)); source.LinkWithCompletion(target); - return source; + return new ToTargetBlockBlock(source, target); } public static IDataflowBlock RunWith(this ISourceBlock source, ITargetBlock target) @@ -142,7 +168,7 @@ public static IDataflowBlock RunWith(this ISourceBlock source, ITargetBloc return new RunWithBlock(source, target); } - public static ISourceBlock Merge(IEnumerable> sources) + public static ISourceBlock Merge(IEnumerable> sources) { if (sources == null) throw new ArgumentNullException(nameof(sources)); @@ -158,9 +184,9 @@ public static ISourceBlock Merge(IEnumerable> sources) ); var workingCount = sourceList.Count; - if (workingCount == 0) return CompletedSourceBlock(); + if (workingCount == 0) return CompletedSourceBlock(); - var resultBlock = new TransformWithoutBufferBlock(IdentityFunc.Instance); + var resultBlock = new TransformWithoutBufferBlock(IdentityFunc.Instance); foreach (var source in sourceList) { @@ -187,9 +213,9 @@ public static ISourceBlock Merge(IEnumerable> sources) return resultBlock; } - public static ISourceBlock Merge(params ISourceBlock[] sources) + public static ISourceBlock Merge(params ISourceBlock[] sources) { - return Merge((IEnumerable>)sources); + return Merge((IEnumerable>)sources); } public static IPropagatorBlock Merge(this IPropagatorBlock propagator, IEnumerable> sources) diff --git a/src/Directory.Build.props b/src/Directory.Build.props new file mode 100644 index 0000000..d5c77c8 --- /dev/null +++ b/src/Directory.Build.props @@ -0,0 +1,9 @@ + + + 0.1.0 + 0.1.0.0 + true + + + +