Skip to content

Commit

Permalink
doc
Browse files Browse the repository at this point in the history
  • Loading branch information
azyobuzin committed May 2, 2020
1 parent 16562f4 commit 362fa18
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<Version>0.1.0</Version>
<LangVersion>8</LangVersion>
<Nullable>enable</Nullable>
<NoWarn>1591</NoWarn>
</PropertyGroup>
</Project>
44 changes: 41 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,46 @@ Battery Included Dataflow Library

## Fluent API for TPL Dataflow

TBD
```csharp
await Enumerable.Range(1, 100).AsSourceBlock()
// RunWith connects to a ITargetBlock and returns a single IDataflowBlock
.RunWith(
new BatchBlock<int>(5, new GroupingDataflowBlockOptions { BoundedCapacity = 5 })
// Chain makes a new IPropagatorBlock linking the upstream and downstream blocks
.Chain(new TransformBlock<int[], int>(
x => x.Sum(),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4,
MaxDegreeOfParallelism = 4,
}))
// ToTargetBlock makes a new ITargetBlock linking the upstream and downstream blocks
.ToTargetBlock(new ActionBlock<int>(x => Console.WriteLine(x)))
)
.Completion;
```

## Actor-like Programming
## AsyncEnumerable Integration

TBD
```csharp
await AsyncEnumerable.Range(1, 100)
// Process elements in parallel with IPropagatorBlock
.RunThroughDataflowBlock(() =>
new TransformBlock<int, int>(
x => x * 10,
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 6,
MaxDegreeOfParallelism = 4,
SingleProducerConstrained = true,
})
)
// The result is an IAsyncEnumerable
// Subsequent process can be written with System.Linq.Async
.ForEachAsync(x => Console.WriteLine(x));
```

# Roadmap

- BiDaFlow.Actors - a lightweight actor-like programming framework with IDataflowBlock
- BiDaFlow.ReactiveStreams - integration with [Reactive Streams](https://github.com/reactive-streams/reactive-streams-dotnet), TPL Dataflow and AsyncEnumerable
12 changes: 6 additions & 6 deletions src/BiDaFlow.AsyncEnumerable/Fluent/DataflowAsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ namespace BiDaFlow.Fluent
{
public static class DataflowAsyncEnumerable
{
public static ISourceBlock<T> AsSourceBlock<T>(this IAsyncEnumerable<T> enumerable, CancellationToken cancellationToken = default)
public static ISourceBlock<TOutput> AsSourceBlock<TOutput>(this IAsyncEnumerable<TOutput> enumerable, CancellationToken cancellationToken = default)
{
if (enumerable == null) throw new ArgumentNullException(nameof(enumerable));

return new AsyncEnumerableSourceBlock<T>(enumerable, null, cancellationToken);
return new AsyncEnumerableSourceBlock<TOutput>(enumerable, null, cancellationToken);
}

public static ISourceBlock<T> AsSourceBlock<T>(this IAsyncEnumerable<T> enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken)
public static ISourceBlock<TOutput> AsSourceBlock<TOutput>(this IAsyncEnumerable<TOutput> enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken)
{
if (enumerable == null) throw new ArgumentNullException(nameof(enumerable));
if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler));

return new AsyncEnumerableSourceBlock<T>(enumerable, taskScheduler, cancellationToken);
return new AsyncEnumerableSourceBlock<TOutput>(enumerable, taskScheduler, cancellationToken);
}

public static IAsyncEnumerable<T> AsAsyncEnumerable<T>(this ISourceBlock<T> source)
public static IAsyncEnumerable<TOutput> AsAsyncEnumerable<TOutput>(this ISourceBlock<TOutput> source)
{
return new SourceBlockAsyncEnumerable<T>(source ?? throw new ArgumentNullException(nameof(source)));
return new SourceBlockAsyncEnumerable<TOutput>(source ?? throw new ArgumentNullException(nameof(source)));
}

public static IAsyncEnumerable<TOutput> RunThroughDataflowBlock<TInput, TOutput>(
Expand Down
27 changes: 27 additions & 0 deletions src/BiDaFlow/Blocks/ToTargetBlockBlock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace BiDaFlow.Blocks
{
internal sealed class ToTargetBlockBlock<T> : ITargetBlock<T>
{
private readonly ITargetBlock<T> _sourceBlock;
private readonly IDataflowBlock _targetBlock;

public ToTargetBlockBlock(ITargetBlock<T> sourceBlock, IDataflowBlock targetBlock)
{
this._sourceBlock = sourceBlock;
this._targetBlock = targetBlock;
}

public Task Completion => this._targetBlock.Completion;

public void Complete() => this._sourceBlock.Complete();

public void Fault(Exception exception) => this._sourceBlock.Fault(exception);

public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
=> this._sourceBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}
67 changes: 67 additions & 0 deletions src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace BiDaFlow.Blocks
{
/// <summary>
/// Provides a dataflow block like <seealso cref="TransformBlock{TInput, TOutput}"/>.
/// This block does not consume items from source blocks until offering a message to link targets succeeds.
/// </summary>
public class TransformWithoutBufferBlock<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>
{
private readonly Func<TInput, TOutput> _transform;
Expand All @@ -21,6 +25,25 @@ public class TransformWithoutBufferBlock<TInput, TOutput> : IPropagatorBlock<TIn
private long _nextId;
private readonly LinkedList<OfferingMessage> _offeringMessages = new LinkedList<OfferingMessage>(); // TODO: more efficient structure

/// <summary>
/// Initializes a new <see cref="TransformWithoutBufferBlock{TInput, TOutput}"/>.
/// </summary>
///
/// <param name="transform">
/// A transform function.
/// <para>Note that this function should be pure because it can be called multiple times for the same item.</para>
/// </param>
///
/// <param name="taskScheduler">A <see cref="TaskScheduler"/> used by offering messages.</param>
///
/// <param name="cancellationToken">
/// A <see cref="CancellationToken"/> to monitor for cancellation requests.
/// <para>When cancellation is requested, this block behaves like when <see cref="Complete"/> is called.</para>
/// </param>
///
/// <exception cref="ArgumentNullException">
/// <paramref name="transform"/> or <paramref name="taskScheduler"/> is <see langword="null"/>.
/// </exception>
public TransformWithoutBufferBlock(Func<TInput, TOutput> transform, TaskScheduler taskScheduler, CancellationToken cancellationToken)
{
this._transform = transform ?? throw new ArgumentNullException(nameof(transform));
Expand All @@ -42,9 +65,44 @@ public TransformWithoutBufferBlock(Func<TInput, TOutput> transform, TaskSchedule
this.Completion.ContinueWith(this.HandleCompletion, taskScheduler);
}

/// <summary>
/// Initializes a new <see cref="TransformWithoutBufferBlock{TInput, TOutput}"/>.
/// </summary>
///
/// <param name="transform">
/// A transform function.
/// <para>Note that this function should be pure because it can be called multiple times for the same item.</para>
/// </param>
///
/// <exception cref="ArgumentNullException"><paramref name="transform"/> is <see langword="null"/>.</exception>
///
/// <remarks>
/// This overload calls <see cref="TransformWithoutBufferBlock{TInput, TOutput}.TransformWithoutBufferBlock(Func{TInput, TOutput}, TaskScheduler, CancellationToken)"/>
/// with <see cref="TaskScheduler.Default"/> and <see cref="CancellationToken.None"/>.
/// </remarks>
public TransformWithoutBufferBlock(Func<TInput, TOutput> transform)
: this(transform, TaskScheduler.Default, CancellationToken.None) { }

/// <summary>
/// Initializes a new <see cref="TransformWithoutBufferBlock{TInput, TOutput}"/>.
/// </summary>
///
/// <param name="transform">
/// A transform function.
/// <para>Note that this function should be pure because it can be called multiple times for the same item.</para>
/// </param>
///
/// <param name="cancellationToken">
/// A <see cref="CancellationToken"/> to monitor for cancellation requests.
/// <para>When cancellation is requested, this block behaves like when <see cref="Complete"/> is called.</para>
/// </param>
///
/// <exception cref="ArgumentNullException"><paramref name="transform"/> is <see langword="null"/>.</exception>
///
/// <remarks>
/// This overload calls <see cref="TransformWithoutBufferBlock{TInput, TOutput}.TransformWithoutBufferBlock(Func{TInput, TOutput}, TaskScheduler, CancellationToken)"/>
/// with <see cref="TaskScheduler.Default"/>.
/// </remarks>
public TransformWithoutBufferBlock(Func<TInput, TOutput> transform, CancellationToken cancellationToken)
: this(transform, TaskScheduler.Default, cancellationToken) { }

Expand All @@ -58,8 +116,16 @@ public TransformWithoutBufferBlock(Func<TInput, TOutput> transform, Cancellation
/// </summary>
private object OfferLock => this._offeringMessages;

/// <inheritdoc cref="IDataflowBlock.Completion"/>
public Task Completion => this._tcs.Task;

/// <summary>
/// Signals to the block to stop consuming and offering messages.
/// </summary>
/// <remarks>
/// When this method is called, <see cref="Completion"/> will immediately be completed
/// because this block has no buffer.
/// </remarks>
public void Complete()
{
this.CompleteCore(null);
Expand All @@ -72,6 +138,7 @@ void IDataflowBlock.Fault(Exception exception)
this.CompleteCore(exception);
}

/// <inheritdoc cref="ISourceBlock{TOutput}.LinkTo(ITargetBlock{TOutput}, DataflowLinkOptions)"/>
public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
{
if (target == null) throw new ArgumentNullException(nameof(target));
Expand Down
62 changes: 44 additions & 18 deletions src/BiDaFlow/Fluent/FluentDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ public static IDisposable PropagateCompletion(this IDataflowBlock source, IDataf
return source.PropagateCompletion(target, WhenPropagate.Both);
}

/// <summary>
/// Links <paramref name="source"/> to <paramref name="target"/> to notify completion of <paramref name="source"/> to <paramref name="target"/>.
/// </summary>
/// <returns>An <see cref="IDisposable"/> to unlink the source from the target.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source"/> or <paramref name="target"/> is <see langword="null"/>.
/// </exception>
public static IDisposable PropagateCompletion(this IDataflowBlock source, IDataflowBlock target, WhenPropagate when)
{
if (source == null) throw new ArgumentNullException(nameof(source));
Expand Down Expand Up @@ -64,6 +71,14 @@ void SendCompletion()

private static readonly DataflowLinkOptions s_propagateCompletionOptions = new DataflowLinkOptions() { PropagateCompletion = true };

/// <summary>
/// Links the <see cref="ISourceBlock{TOutput}"/> to the specified <see cref="ITargetBlock{TInput}"/> propagating completion.
/// </summary>
/// <remarks>
/// This method is the same as <c>DataflowBlock.LinkTo(source, target, new DataflowLinkOptions() { PropagateCompletion = true })</c>.
/// </remarks>
/// <seealso cref="DataflowBlock.LinkTo{TOutput}(ISourceBlock{TOutput}, ITargetBlock{TOutput})"/>
/// <inheritdoc cref="DataflowBlock.LinkTo{TOutput}(ISourceBlock{TOutput}, ITargetBlock{TOutput})"/>
public static IDisposable LinkWithCompletion<TOutput>(this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target)
{
if (source == null) throw new ArgumentNullException(nameof(source));
Expand All @@ -72,45 +87,56 @@ public static IDisposable LinkWithCompletion<TOutput>(this ISourceBlock<TOutput>
return source.LinkTo(target, s_propagateCompletionOptions);
}

public static ISourceBlock<T> CompletedSourceBlock<T>()
/// <summary>
/// Returns a <see cref="ISourceBlock{TOutput}"/> that have been completed.
/// </summary>
public static ISourceBlock<TOutput> CompletedSourceBlock<TOutput>()
{
return CompletedSourceBlockHolder<T>.Instance;
return CompletedSourceBlockHolder<TOutput>.Instance;
}

public static ISourceBlock<T> AsSourceBlock<T>(this IEnumerable<T> enumerable, CancellationToken cancellationToken = default)
public static ISourceBlock<TOutput> AsSourceBlock<TOutput>(this IEnumerable<TOutput> enumerable, CancellationToken cancellationToken = default)
{
if (enumerable == null) throw new ArgumentNullException(nameof(enumerable));

return new EnumerableSourceBlock<T>(enumerable, null, cancellationToken);
return new EnumerableSourceBlock<TOutput>(enumerable, null, cancellationToken);
}

public static ISourceBlock<T> AsSourceBlock<T>(this IEnumerable<T> enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken)
public static ISourceBlock<TOutput> AsSourceBlock<TOutput>(this IEnumerable<TOutput> enumerable, TaskScheduler taskScheduler, CancellationToken cancellationToken)
{
if (enumerable == null) throw new ArgumentNullException(nameof(enumerable));
if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler));

return new EnumerableSourceBlock<T>(enumerable, taskScheduler, cancellationToken);
return new EnumerableSourceBlock<TOutput>(enumerable, taskScheduler, cancellationToken);
}

public static ISourceBlock<T> ToSourceBlock<T>(this IEnumerator<T> enumerator, CancellationToken cancellationToken = default)
public static ISourceBlock<TOutput> ToSourceBlock<TOutput>(this IEnumerator<TOutput> enumerator, CancellationToken cancellationToken = default)
{
if (enumerator == null) throw new ArgumentNullException(nameof(enumerator));

return new EnumerableSourceBlock<T>(enumerator, null, cancellationToken);
return new EnumerableSourceBlock<TOutput>(enumerator, null, cancellationToken);
}

public static ISourceBlock<T> ToSourceBlock<T>(this IEnumerator<T> enumerator, TaskScheduler taskScheduler, CancellationToken cancellationToken)
public static ISourceBlock<TOutput> ToSourceBlock<TOutput>(this IEnumerator<TOutput> enumerator, TaskScheduler taskScheduler, CancellationToken cancellationToken)
{
if (enumerator == null) throw new ArgumentNullException(nameof(enumerator));
if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler));

return new EnumerableSourceBlock<T>(enumerator, taskScheduler, cancellationToken);
return new EnumerableSourceBlock<TOutput>(enumerator, taskScheduler, cancellationToken);
}

public static IObserver<T> AsObserverDroppingOverflowItems<T>(this ITargetBlock<T> target)
/// <summary>
/// Creates a new <see cref="IObserver{T}"/> abstraction over the <see cref="ITargetBlock{TInput}"/>.
/// This method drops overflow items in contrast to <seealso cref="DataflowBlock.AsObserver{TInput}(ITargetBlock{TInput})"/>,
/// which buffers overflow items with <see cref="DataflowBlock.SendAsync{TInput}(ITargetBlock{TInput}, TInput)"/>.
/// </summary>
/// <param name="target">The target to wrap</param>
/// <returns>An oberver that wraps the target block.</returns>
/// <exception cref="ArgumentNullException"><paramref name="target"/> is <see langword="null"/>.</exception>
public static IObserver<TInput> AsObserverDroppingOverflowItems<TInput>(this ITargetBlock<TInput> target)
{
if (target == null) throw new ArgumentNullException(nameof(target));
return new DroppingObserver<T>(target);
return new DroppingObserver<TInput>(target);
}

public static IPropagatorBlock<TInput, TOutput> Chain<TInput, TSourceOutput, TOutput>(
Expand All @@ -130,7 +156,7 @@ public static ITargetBlock<TInput> ToTargetBlock<TInput, TOutput>(this IPropagat
if (target == null) throw new ArgumentNullException(nameof(target));

source.LinkWithCompletion(target);
return source;
return new ToTargetBlockBlock<TInput>(source, target);
}

public static IDataflowBlock RunWith<T>(this ISourceBlock<T> source, ITargetBlock<T> target)
Expand All @@ -142,7 +168,7 @@ public static IDataflowBlock RunWith<T>(this ISourceBlock<T> source, ITargetBloc
return new RunWithBlock(source, target);
}

public static ISourceBlock<T> Merge<T>(IEnumerable<ISourceBlock<T>> sources)
public static ISourceBlock<TOutput> Merge<TOutput>(IEnumerable<ISourceBlock<TOutput>> sources)
{
if (sources == null) throw new ArgumentNullException(nameof(sources));

Expand All @@ -158,9 +184,9 @@ public static ISourceBlock<T> Merge<T>(IEnumerable<ISourceBlock<T>> sources)
);

var workingCount = sourceList.Count;
if (workingCount == 0) return CompletedSourceBlock<T>();
if (workingCount == 0) return CompletedSourceBlock<TOutput>();

var resultBlock = new TransformWithoutBufferBlock<T, T>(IdentityFunc<T>.Instance);
var resultBlock = new TransformWithoutBufferBlock<TOutput, TOutput>(IdentityFunc<TOutput>.Instance);

foreach (var source in sourceList)
{
Expand All @@ -187,9 +213,9 @@ public static ISourceBlock<T> Merge<T>(IEnumerable<ISourceBlock<T>> sources)
return resultBlock;
}

public static ISourceBlock<T> Merge<T>(params ISourceBlock<T>[] sources)
public static ISourceBlock<TOutput> Merge<TOutput>(params ISourceBlock<TOutput>[] sources)
{
return Merge((IEnumerable<ISourceBlock<T>>)sources);
return Merge((IEnumerable<ISourceBlock<TOutput>>)sources);
}

public static IPropagatorBlock<TInput, TOutput> Merge<TInput, TOutput>(this IPropagatorBlock<TInput, TOutput> propagator, IEnumerable<ISourceBlock<TOutput>> sources)
Expand Down
9 changes: 9 additions & 0 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project>
<PropertyGroup>
<Version>0.1.0</Version>
<AssemblyVersion>0.1.0.0</AssemblyVersion>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<Import Project="..\Directory.Build.props" />
</Project>

0 comments on commit 362fa18

Please sign in to comment.