Skip to content

Commit

Permalink
Something big commit
Browse files Browse the repository at this point in the history
  • Loading branch information
azyobuzin committed Apr 27, 2020
1 parent 2721177 commit efbeca6
Show file tree
Hide file tree
Showing 12 changed files with 507 additions and 19 deletions.
10 changes: 6 additions & 4 deletions src/BiDaFlow/Actors/ActorExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ namespace BiDaFlow.Actors
public static class ActorExtensions
{
public static ITargetBlock<TInput> AsTargetBlock<TActor, TInput>(this TActor actor, Func<TActor, TInput, Envelope?> createMessage)
where TActor : IActor
where TActor : Actor
{
if (actor == null) throw new ArgumentNullException(nameof(actor));
if (createMessage == null) throw new ArgumentNullException(nameof(createMessage));

var iactor = (IActor)actor;

var transformBlock = new TransformWithoutBufferBlock<TInput, Envelope?>(
x => createMessage(actor, x),
actor.Engine.TaskScheduler,
actor.Engine.CancellationToken);
transformBlock.LinkWithCompletion(actor.Engine.Target);
iactor.Engine.TaskScheduler,
iactor.Engine.CancellationToken);
transformBlock.LinkWithCompletion(iactor.Engine.Target);

return transformBlock;
}
Expand Down
12 changes: 6 additions & 6 deletions src/BiDaFlow/Actors/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ namespace BiDaFlow.Actors
{
public class Envelope
{
public IActor Address { get; }
public Actor Address { get; }
internal Func<Task> Action { get; }

internal Envelope(IActor address, Func<Task> action)
internal Envelope(Actor address, Func<Task> action)
{
this.Address = address;
this.Action = action;
}

public bool Post()
{
return this.Address.Engine.Target.Post(this);
return ((IActor)this.Address).Engine.Target.Post(this);
}

public Task<bool> SendAsync(CancellationToken cancellationToken)
{
return this.Address.Engine.Target.SendAsync(this, cancellationToken);
return ((IActor)this.Address).Engine.Target.SendAsync(this, cancellationToken);
}

public Task<bool> SendAsync()
Expand All @@ -34,11 +34,11 @@ public Task<bool> SendAsync()

public class EnvelopeWithReply<TReply>
{
public IActor Address { get; }
public Actor Address { get; }
internal Func<Task<TReply>> Action { get; }
internal bool HandleErrorByReceiver { get; }

internal EnvelopeWithReply(IActor address, Func<Task<TReply>> action, bool handleErrorByReceiver)
internal EnvelopeWithReply(Actor address, Func<Task<TReply>> action, bool handleErrorByReceiver)
{
this.Address = address;
this.Action = action;
Expand Down
1 change: 1 addition & 0 deletions src/BiDaFlow/Actors/IActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace BiDaFlow.Actors
{
// TODO: 廃止
public interface IActor : IDataflowBlock
{
ActorEngine Engine { get; }
Expand Down
6 changes: 3 additions & 3 deletions src/BiDaFlow/Actors/SupervisedDataflowBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public class SupervisedDataflowBlock<T> : IDataflowBlock where T : class, IDataf
{
private readonly Func<Task<T>> _startFunc;
private readonly Func<AggregateException?, Task<RescueAction>> _rescueFunc;
private readonly TaskScheduler _taskScheduler;
private readonly TaskCompletionSource<ValueTask> _tcs;
internal readonly TaskScheduler _taskScheduler;
private readonly TaskCompletionSource<ValueTuple> _tcs;
private bool _started;
private T? _currentBlock;
private readonly Queue<Action<T>> _actionQueue = new Queue<Action<T>>();
Expand All @@ -21,7 +21,7 @@ internal SupervisedDataflowBlock(Func<Task<T>> startFunc, Func<AggregateExceptio
this._startFunc = startFunc ?? throw new ArgumentNullException(nameof(startFunc));
this._rescueFunc = rescueFunc ?? throw new ArgumentNullException(nameof(rescueFunc));
this._taskScheduler = taskScheduler ?? throw new ArgumentNullException(nameof(taskScheduler));
this._tcs = new TaskCompletionSource<ValueTask>();
this._tcs = new TaskCompletionSource<ValueTuple>();

new TaskFactory(taskScheduler).StartNew(this.Restart);
}
Expand Down
23 changes: 23 additions & 0 deletions src/BiDaFlow/Actors/SupervisedDataflowBlockExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using BiDaFlow.Blocks;

namespace BiDaFlow.Actors
{
public static class SupervisedDataflowBlockExtensions
{
private static readonly ConditionalWeakTable<object, Dictionary<Type, object>> s_sourceBlocks = new ConditionalWeakTable<object, Dictionary<Type, object>>();
private static readonly ConditionalWeakTable<object, Dictionary<Type, object>> s_targetBlocks = new ConditionalWeakTable<object, Dictionary<Type, object>>();

public static ISourceBlock<TSource> AsSourceBlock<T, TSource>(this SupervisedDataflowBlock<T> supervisedBlock)
where T : class, ISourceBlock<TSource>
{
throw new NotImplementedException();

var dic = s_sourceBlocks.GetOrCreateValue(supervisedBlock);

lock(dic)
{
if (dic.TryGetValue(typeof(TSource), out var obj))
return (ISourceBlock<TSource>)obj;

var block = new TransformWithoutBufferBlock<TSource, TSource>(x => x, supervisedBlock._taskScheduler, CancellationToken.None);
}
}

// TODO: AsSourceBlock, AsTargetBlock (and Actor), AsPropagatorBlock
}
}
1 change: 0 additions & 1 deletion src/BiDaFlow/BiDaFlow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

<ItemGroup>
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.6.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.2" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard1.3'">
Expand Down
8 changes: 7 additions & 1 deletion src/BiDaFlow/Blocks/TransformWithoutBufferBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public void Complete()
}

this._tcs.TrySetResult(default);

// TODO: Release reservations
}

void IDataflowBlock.Fault(Exception exception)
Expand Down Expand Up @@ -290,6 +292,7 @@ DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader me

if (linkedTarget.IsLinked)
{
// FIXME: If target is LinkTarget, source cannot be null
var status = linkedTarget.Target.OfferMessage(myHeader, transformedValue, source == null ? null : this, consumeToAccept);

switch (status)
Expand Down Expand Up @@ -375,7 +378,10 @@ private void OfferToTargets()
goto StartConsume;

case DataflowMessageStatus.Postponed:
return;
// If the message has been reserved, offer no more
if (messageNode.Value.ReservedBy != null)
return;
break;

case DataflowMessageStatus.DecliningPermanently:
linkedTarget.Unlink();
Expand Down
3 changes: 2 additions & 1 deletion src/BiDaFlow/Fluent/FluentDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public static ISourceBlock<T> AsSourceBlock<T>(this IEnumerable<T> enumerable, C
{
while (!cancellationToken.IsCancellationRequested && enumerator.MoveNext())
{
// TODO: more efficient implementation
var accepted = await block.SendAsync(enumerator.Current, cancellationToken);
if (!accepted) return;
}
Expand Down Expand Up @@ -182,7 +183,7 @@ public static IPropagatorBlock<TInput, TOutput> Merge<TInput, TOutput>(this IPro
if (propagator == null) throw new ArgumentNullException(nameof(propagator));
if (sources == null) throw new ArgumentNullException(nameof(sources));

var mergedSource = Merge(sources.Prepend(propagator));
var mergedSource = Merge(new[] { propagator }.Concat(sources));
return DataflowBlock.Encapsulate(propagator, mergedSource);
}

Expand Down
65 changes: 65 additions & 0 deletions src/BiDaFlow/Internal/LinkedTarget2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace BiDaFlow.Internal
{
internal sealed class LinkedTarget2<T>
{
public ITargetBlock<T> Target { get; }

private readonly bool _propagateCompletion;
private readonly Action<LinkedTarget2<T>>? _unlinkCallback;
private int _remainingMessages;
private int _unlinked;

public LinkedTarget2(ITargetBlock<T> target, int maxMessages, bool propagateCompletion, Action<LinkedTarget2<T>>? unlinkCallback)
{
if (maxMessages <= 0 && maxMessages != DataflowBlockOptions.Unbounded)
throw new ArgumentOutOfRangeException(nameof(maxMessages));

this.Target = target ?? throw new ArgumentNullException(nameof(target));
this._remainingMessages = maxMessages;
this._propagateCompletion = propagateCompletion;
this._unlinkCallback = unlinkCallback;
}

public bool Unlinked => this._unlinked != 0;

public void DecrementRemainingMessages()
{
if (this._remainingMessages == DataflowBlockOptions.Unbounded)
return;

var newValue = this._remainingMessages - 1;
if (newValue < 0) throw new InvalidOperationException("newValue is " + newValue);

this._remainingMessages = newValue;

if (newValue == 0) this.Unlink();
}

public void Complete(Exception? exception)
{
if (!this._propagateCompletion || this.Unlinked)
return;

if (exception == null)
{
this.Target.Complete();
}
else
{
this.Target.Fault(exception);
}
}

public void Unlink()
{
if (Interlocked.Exchange(ref this._unlinked, 1) == 0)
{
this._unlinkCallback?.Invoke(this);
}
}
}
}
Loading

0 comments on commit efbeca6

Please sign in to comment.